元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
Lua语言基础
▶
环境搭建
安装Lua解释器
配置开发环境
第一个Lua程序
▶
基本语法
变量与数据类型
运算符与表达式
控制结构
▶
数据结构
表(Table)详解
数组与迭代
字符串处理
▶
Lua高级编程
▶
函数编程
函数定义与调用
闭包与作用域
高阶函数应用
▶
元表与元方法
元表基础
操作符重载
继承与对象系统
▶
协程编程
协程基础
生产者-消费者模式
协程调度实践
▶
Lua应用实践
▶
游戏开发
Lua与游戏引擎集成
AI脚本实现
热更新机制
▶
系统编程
Lua与C交互
扩展库开发
性能优化技巧
▶
实用工具开发
配置文件解析
自动化测试框架
网络编程基础
发布时间:
2025-03-24 12:09
↑
☰
# Lua生产者-消费者模式 本文将介绍如何在Lua中使用协程实现生产者-消费者模式,这是一个经典的并发编程模式。 ## 基本概念 生产者-消费者模式是一种常见的并发设计模式,它描述了两类协作的进程: - 生产者:生成数据、信息或任务 - 消费者:处理生产者生成的数据 在Lua中,我们可以使用协程来优雅地实现这种模式。 ## 简单实现 ### 1. 基本结构 ```lua -- 生产者协程 local function producer() local i = 0 while true do i = i + 1 coroutine.yield(i) -- 生产数据 end end -- 消费者函数 local function consumer() local p = coroutine.create(producer) local status, value repeat status, value = coroutine.resume(p) if status then print("消费数据:" .. value) end until not status end -- 使用示例 consumer() ``` ### 2. 带缓冲的实现 ```lua -- 创建缓冲区 local Buffer = {} Buffer.__index = Buffer function Buffer.new(size) return setmetatable({ items = {}, size = size, count = 0 }, Buffer) end function Buffer:put(item) if self.count >= self.size then coroutine.yield("buffer_full") -- 缓冲区满 end table.insert(self.items, item) self.count = self.count + 1 end function Buffer:get() if self.count == 0 then coroutine.yield("buffer_empty") -- 缓冲区空 end local item = table.remove(self.items, 1) self.count = self.count - 1 return item end -- 带缓冲的生产者 local function bufferedProducer(buffer) local i = 0 while true do i = i + 1 buffer:put(i) coroutine.yield("produced", i) end end -- 带缓冲的消费者 local function bufferedConsumer(buffer) while true do local item = buffer:get() print("消费数据:" .. item) coroutine.yield("consumed", item) end end ``` ## 高级实现 ### 1. 多生产者-多消费者 ```lua -- 调度器 local Scheduler = {} Scheduler.__index = Scheduler function Scheduler.new() return setmetatable({ producers = {}, consumers = {}, buffer = Buffer.new(10) }, Scheduler) end function Scheduler:addProducer(producer) table.insert(self.producers, coroutine.create(producer)) end function Scheduler:addConsumer(consumer) table.insert(self.consumers, coroutine.create(consumer)) end function Scheduler:run() while true do -- 运行所有生产者 for _, p in ipairs(self.producers) do if coroutine.status(p) ~= "dead" then local status, action, value = coroutine.resume(p, self.buffer) if not status then print("生产者错误:" .. action) end end end -- 运行所有消费者 for _, c in ipairs(self.consumers) do if coroutine.status(c) ~= "dead" then local status, action, value = coroutine.resume(c, self.buffer) if not status then print("消费者错误:" .. action) end end end -- 简单的调度延迟 coroutine.yield() end end -- 使用示例 local scheduler = Scheduler.new() -- 添加多个生产者 for i = 1, 3 do scheduler:addProducer(function(buffer) local count = 0 while count < 5 do count = count + 1 local value = string.format("P%d-%d", i, count) buffer:put(value) coroutine.yield("produced", value) end end) end -- 添加多个消费者 for i = 1, 2 do scheduler:addConsumer(function(buffer) while true do local value = buffer:get() print(string.format("消费者%d处理:%s", i, value)) coroutine.yield("consumed", value) end end) end -- 运行调度器 scheduler:run() ``` ### 2. 优先级队列 ```lua -- 优先级缓冲区 local PriorityBuffer = {} PriorityBuffer.__index = PriorityBuffer function PriorityBuffer.new(size) return setmetatable({ items = {}, size = size, count = 0 }, PriorityBuffer) end function PriorityBuffer:put(item, priority) if self.count >= self.size then coroutine.yield("buffer_full") end -- 插入按优先级排序 local inserted = false for i, existing in ipairs(self.items) do if existing.priority < priority then table.insert(self.items, i, {value = item, priority = priority}) inserted = true break end end if not inserted then table.insert(self.items, {value = item, priority = priority}) end self.count = self.count + 1 end function PriorityBuffer:get() if self.count == 0 then coroutine.yield("buffer_empty") end local item = table.remove(self.items, 1) self.count = self.count - 1 return item.value end ``` ## 实际应用 ### 1. 任务队列 ```lua -- 任务队列实现 local TaskQueue = {} TaskQueue.__index = TaskQueue function TaskQueue.new() return setmetatable({ buffer = Buffer.new(100), workers = {} }, TaskQueue) end function TaskQueue:addTask(task) self.buffer:put(task) end function TaskQueue:addWorker(worker) local co = coroutine.create(function() while true do local task = self.buffer:get() worker(task) coroutine.yield() end end) table.insert(self.workers, co) end function TaskQueue:run() while true do for _, worker in ipairs(self.workers) do if coroutine.status(worker) ~= "dead" then coroutine.resume(worker) end end coroutine.yield() end end -- 使用示例 local queue = TaskQueue.new() -- 添加任务 for i = 1, 10 do queue:addTask({ id = i, data = "任务" .. i }) end -- 添加工作者 queue:addWorker(function(task) print(string.format("处理任务%d:%s", task.id, task.data)) end) -- 运行队列 queue:run() ``` ### 2. 数据流处理 ```lua -- 数据流处理器 local function dataProcessor(producer, transformer, consumer) local p = coroutine.create(producer) local t = coroutine.create(transformer) local c = coroutine.create(consumer) local function process() while true do -- 获取数据 local status, data = coroutine.resume(p) if not status or not data then break end -- 转换数据 local status, result = coroutine.resume(t, data) if not status then break end -- 消费数据 local status = coroutine.resume(c, result) if not status then break end coroutine.yield() end end return coroutine.create(process) end -- 使用示例 local producer = function() for i = 1, 5 do coroutine.yield(i) end end local transformer = function(data) coroutine.yield(data * 2) end local consumer = function(data) print("处理结果:" .. data) coroutine.yield() end local processor = dataProcessor(producer, transformer, consumer) while coroutine.status(processor) ~= "dead" do coroutine.resume(processor) end ``` ## 常见问题解答 ### Q1: 为什么选择协程实现生产者-消费者模式? 协程提供了以下优势: - 轻量级,开销小 - 协作式调度,控制更灵活 - 代码更清晰,易于理解 - 无需考虑线程同步问题 ### Q2: 如何处理缓冲区满/空的情况? ```lua -- 使用yield暂停执行 function Buffer:put(item) while self.count >= self.size do coroutine.yield("buffer_full") end -- 继续处理... end function Buffer:get() while self.count == 0 do coroutine.yield("buffer_empty") end -- 继续处理... end ``` ### Q3: 如何实现超时处理? ```lua -- 带超时的消费者 local function consumerWithTimeout(timeout) local startTime = os.time() return function(buffer) while true do if os.time() - startTime > timeout then error("消费者超时") end local item = buffer:get() print("消费数据:" .. item) coroutine.yield() end end end ``` ## 下一步 现在你已经掌握了生产者-消费者模式的实现,可以继续学习[协程调度实践](/article/lua/advanced/coroutine-scheduling)来了解更复杂的协程应用场景。 ## 参考资源 - [Lua 5.4 参考手册](http://www.lua.org/manual/5.4/manual.html) - [Programming in Lua - 协程](http://www.lua.org/pil/9.html) - [Lua设计模式](http://lua-users.org/wiki/DesignPatterns)