元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
所有权系统
所有权规则
借用与引用
Move语义
切片与所有权
▶
生命周期
生命周期基础
生命周期省略规则
结构体中的生命周期
高阶生命周期
▶
类型系统
Traits与泛型
类型推断机制
PhantomData与泛型约束
Newtype模式
▶
并发模型
线程与消息传递
共享状态并发
异步编程基础
Future与Executor
▶
内存管理
栈与堆内存布局
内存分配器原理
Box与智能指针
内存安全策略
▶
错误处理
Result与Option
错误传播机制
Panic与Abort
自定义错误类型
▶
宏系统
声明宏与过程宏
宏展开机制
卫生宏实现
元编程实践
▶
Unsafe Rust
Unsafe关键字解析
原始指针操作
FFI交互实践
安全抽象模式
发布时间:
2025-03-22 08:52
↑
☰
# Future与执行器 Rust的异步编程模型中,Future和执行器是两个核心概念。本文将深入探讨Future的工作原理和如何实现自定义执行器。 ## Future深入解析 ### 1. Future特征详解 ```rust use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; // Future特征的核心定义 pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } // 一个简单的Future实现 struct ReadyFuture<T>(Option<T>); impl<T> Future for ReadyFuture<T> { type Output = T; fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> { match self.0.take() { Some(val) => Poll::Ready(val), None => Poll::Pending, } } } ``` ### 2. Pin与Unpin ```rust use std::pin::Pin; use std::marker::PhantomPinned; // 需要固定的数据结构 struct PinnedData { data: String, _marker: PhantomPinned, } impl PinnedData { fn new(data: String) -> Pin<Box<Self>> { let data = Box::pin(PinnedData { data, _marker: PhantomPinned, }); data } fn get_data(self: Pin<&Self>) -> &str { &self.data } } ``` ## 执行器实现 ### 1. 基本执行器 ```rust use std::future::Future; use std::task::{Context, Poll}; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::Wake; use std::thread; struct BasicExecutor { tasks: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>, } impl BasicExecutor { fn new() -> Self { BasicExecutor { tasks: Vec::new() } } fn spawn<F>(&mut self, future: F) where F: Future<Output = ()> + Send + 'static, { self.tasks.push(Box::pin(future)); } fn run(&mut self) { let waker = Arc::new(TaskWaker); let mut context = Context::from_waker(&waker); while !self.tasks.is_empty() { let mut remaining = Vec::new(); for mut task in self.tasks.drain(..) { match task.as_mut().poll(&mut context) { Poll::Ready(()) => {}, // 任务完成 Poll::Pending => remaining.push(task), // 任务未完成 } } self.tasks = remaining; } } } // 简单的Waker实现 struct TaskWaker; impl Wake for TaskWaker { fn wake(self: Arc<Self>) { // 在实际应用中,这里应该通知执行器重新调度任务 } } ``` ### 2. 异步任务调度 ```rust use std::future::Future; use std::sync::mpsc; use std::task::{Context, Poll, Waker}; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::thread; // 任务调度器 struct Scheduler { sender: mpsc::Sender<Arc<Task>>, receiver: mpsc::Receiver<Arc<Task>>, } // 任务封装 struct Task { future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>, waker: Arc<Mutex<Option<Waker>>>, } impl Scheduler { fn new() -> Self { let (sender, receiver) = mpsc::channel(); Scheduler { sender, receiver } } fn spawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static, { let task = Arc::new(Task { future: Mutex::new(Box::pin(future)), waker: Arc::new(Mutex::new(None)), }); self.sender.send(task).unwrap(); } fn run(&self) { while let Ok(task) = self.receiver.recv() { let waker = futures::task::waker(task.clone()); let mut context = Context::from_waker(&waker); let mut future = task.future.lock().unwrap(); match future.as_mut().poll(&mut context) { Poll::Ready(()) => {}, // 任务完成 Poll::Pending => { // 存储waker以便后续唤醒 *task.waker.lock().unwrap() = Some(waker); } } } } } ``` ## 高级执行器特性 ### 1. 定时器实现 ```rust use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; // 定时器Future struct Delay { when: Instant, } impl Delay { fn new(duration: Duration) -> Self { Delay { when: Instant::now() + duration, } } } impl Future for Delay { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if Instant::now() >= self.when { Poll::Ready(()) } else { // 设置一个定时器来唤醒 let waker = cx.waker().clone(); thread::spawn(move || { thread::sleep(Duration::from_secs(1)); waker.wake(); }); Poll::Pending } } } ``` ### 2. 任务优先级 ```rust use std::cmp::Ordering; use std::collections::BinaryHeap; // 带优先级的任务 struct PrioritizedTask { priority: u32, task: Arc<Task>, } impl PartialEq for PrioritizedTask { fn eq(&self, other: &Self) -> bool { self.priority == other.priority } } impl Eq for PrioritizedTask {} impl PartialOrd for PrioritizedTask { fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) } } impl Ord for PrioritizedTask { fn cmp(&self, other: &Self) -> Ordering { self.priority.cmp(&other.priority) } } // 优先级调度器 struct PriorityScheduler { tasks: BinaryHeap<PrioritizedTask>, } impl PriorityScheduler { fn new() -> Self { PriorityScheduler { tasks: BinaryHeap::new(), } } fn spawn<F>(&mut self, priority: u32, future: F) where F: Future<Output = ()> + Send + 'static, { let task = Arc::new(Task { future: Mutex::new(Box::pin(future)), waker: Arc::new(Mutex::new(None)), }); self.tasks.push(PrioritizedTask { priority, task, }); } } ``` ## 最佳实践 ### 1. 性能优化 ```rust // 使用工作窃取调度 struct WorkStealingExecutor { local_queue: VecDeque<Arc<Task>>, global_queue: Arc<Mutex<VecDeque<Arc<Task>>>>, stealers: Vec<Arc<Mutex<VecDeque<Arc<Task>>>>>, } impl WorkStealingExecutor { fn steal_task(&mut self) -> Option<Arc<Task>> { // 尝试从其他队列窃取任务 for stealer in &self.stealers { if let Some(task) = stealer.lock().unwrap().pop_front() { return Some(task); } } None } } ``` ### 2. 错误处理 ```rust use std::error::Error; // 带错误处理的Future struct SafeFuture<F, E> { inner: F, error_handler: Box<dyn Fn(E) + Send + Sync>, } impl<F, E> SafeFuture<F, E> where F: Future<Output = Result<(), E>>, E: Error + 'static, { fn new(future: F, error_handler: impl Fn(E) + Send + Sync + 'static) -> Self { SafeFuture { inner: future, error_handler: Box::new(error_handler), } } } impl<F, E> Future for SafeFuture<F, E> where F: Future<Output = Result<(), E>>, E: Error + 'static, { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { match unsafe { Pin::new_unchecked(&mut self.inner) }.poll(cx) { Poll::Ready(Ok(())) => Poll::Ready(()), Poll::Ready(Err(e)) => { (self.error_handler)(e); Poll::Ready(()) } Poll::Pending => Poll::Pending, } } } ``` ## 注意事项 1. **性能考虑**: - 避免过度使用线程 - 合理设置任务优先级 - 优化内存分配 2. **安全性**: - 正确处理Pin - 避免内存泄漏 - 处理任务panic 3. **可维护性**: - 清晰的错误处理策略 - 良好的日志记录 - 模块化设计 ## 总结 Rust的Future和执行器系统提供了强大而灵活的异步编程能力。通过深入理解其工作原理,我们可以: 1. 实现高效的自定义执行器 2. 优化异步任务调度 3. 构建可靠的异步系统 在实际应用中,应根据具体需求选择或实现合适的执行器,并始终注意性能和安全性的平衡。