SFC2020
SFC2020 管理员

554枚
铜币

689点
威望

0个
银元

Scala的Future介绍

2020-06-30 16:46

3470

图片:微信图片_20200417110302.jpg



流程解析
将任务提交到线程池
 
先看一个 Future 用法的 例子
1 Future { println("start future") }
 
在 scala 的语法里,以下三种写法作用一样
1 Future {}
2 Future ()
3 Future.apply()
 
进入到 Future.apply 方法
1 def apply[T](body: =>T)(implicit @deprecatedName("execctx") executor: ExecutionContext): Future[T] =    
2 unit.map(_ => body)
 
unit 为
1 val unit: Future[Unit] = successful(())
 
再看 successful
1 def successful[T](result: T): Future[T] = Promise.successful(result).future
 
进入到 Promise.successful()
1 def successful[T](result: T): Promise[T] = fromTry(Success(result))
2 def fromTry[T](result: Try[T]): Promise[T] = impl.Promise.KeptPromise[T](result)
 
进入到 Promise.KeptPromise apply方法
1 def apply[T](result: Try[T]): scala.concurrent.Promise[T] =  
2 resolveTry(result) match {    
3 case s @ Success(_) => new Successful(s)    
4 case f @ Failure(_) => new Failed(f)  
5 }
 
最终构造了一个 Kept 对象 Kept 是 Promise 的子类,Promise 是 Future 的子类
1 Kept[T] extends Promise[T]
2
3 Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T]
 
执行任务 body : => T 我们重回 Future.apply,看看 unit.map(_ => body) 的逻辑
1 def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f)
 
transform 是一个抽象方法,所以我们去看子类 Promise.transform 的实现
1 import scala.concurrent.Future
2 import scala.concurrent.impl.Promise.DefaultPromise
3
4 override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = {  
5 val p = new DefaultPromise[S]()  
6 onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }  

7 p.future
8 }
 
根据 DefaultPromise 类的定义
1 class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]
2
3 val p = new DefaultPromise[S]()
4 //实际上是初始化了一个 AtomicReference[AnyRef](空的list)
5 Nil = List.empty
 
接着我们看 DefaultPromise.onComplete 的实现,DefaultPromise 是 AtomicReference 无锁的对象引用的子类

1final def onComplete(func: Try[T] => U(implicit executor:ExecutionContext): Unit = dispatchOrAddCallback(new CallbackRunnable[T](executor.prepare(), func))
2//尾递归优化
3@tailrec
4private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {  
5get() match {    
6case r: Try[_]          => runnable.executeWithValue(r.asInstanceOf[Try[T]])    
7case dp: DefaultPromise[_] => compressedRoot(dp).dispatchOrAddCallback(runnable)    
8case listeners: List[_] => if (compareAndSet(listeners, runnable :: listeners)) ()                                

9else dispatchOrAddCallback(runnable)  
10}
11}
 
最终 runable.executeWithValue 执行,也就是 CallbackRunnable.executeWithValue 提交任务到线程池去执行
1 private final class CallbackRunnable[T

(val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable {  
2 // must be filled in before running it  
3 var value: Try[T] = null
4
5 override def run() = {    
6 require(value ne null) // must set value to non-null before running!    
7 try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }  
8 }
9  
10 def executeWithValue(v: Try[T]): Unit = {    
11 require(value eq null) // can't complete it twice    
12 value = v    
13 // Note that we cannot prepare the ExecutionContext at this point, since we might    
14 // already be running on a different thread!    
15 try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t }  
16 }
17 }
 
看 CallbackRunnalbe 的定义, 函数onComplete的实现为 Promise.transform 的实现中的代码
result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) })
 
最后 p.complete 会返回一个 Promise 对象也就是 Future对象本身
def complete(result: Try[T]): this.type =    
if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")
 
总结
创建Future 交给 Promise 对象管理,并将线程池引用传入到 Promise 对象中, Promise 对 Future 里的任务进行调度执行


本文源自梦境迷离 ScalaCoder,转载目的在于传递更多信息,版权归原作者所有。


返回顶部