Futures

Can somebody write an explanation about this? The code and the comments do not help much. :-(

Sure :) I should have in the first place.

Refer to Alice ML’s description of futures (http://www.ps.uni-sb.de/alice/manual/futures.html) for detailed explanations of these classes. My intent was to provide a clean and simple way of dealing with many kinds of objects that are “holders” of a certain type of value, where the satisfaction of those values has different semantics.

Briefly, there are several types, as outlined below. Exceptions are preserved in this framework; if the computation of a value resulted in an exception, accessing the future will throw (or rethrow) that same exception value.

  • Ready - is a simple boxed value.
  • Lazy - is a suspended computation that will be performed once, in thread, on first access, to determine the value.
  • Promise - is a future value that will be provided by another thread. A thread will block if the promise hasn’t been fulfilled. Another thread must fulfill it.
  • Spawn - is a future whose value will be computed in a separate, newly created thread. The call will return immediately. Any further attempt to access the future’s value will block until the spawned thread has computed the value.
  • Cells - are futures that automatically track dependencies. If Cell A is <i>read</i> during the computation of the value of Cell B (the <i>write</i>), B is dependent on A. Any subsequent change to A will result in B’s value being “unset”, and recalculated on the next read access.

Before using the spawn method first create an implicit ThreadGroup, e.g.

implicit val tg = new ThreadGroup("Futures")
package scalax.future;
 
/** A Future is a function of arity 0 that returns a value of type T.
It can be queried to find out if the value is available, if a failure
has occurred, and what the failure is, if failure happened.  If the
value of the future is retrieved and a failure results, the failure
exception is thrown. 
This code is in the public domain.
@author Ross Judson
 
*/
trait Future[T] extends Function0[T] {
  def isSet: boolean;
  def isFailure: boolean;
  def failure: Throwable;
}
 
/** A Promise is a placeholding Future, where the result
of the computation can be set externally.  */
trait Promise[T] extends Future[T] {
  def set(t: T): unit;
  def fail(ex: Throwable): unit;
  def future: Future[T];
}
 
/** A Dependent is a calculated value dependent on other values.  The
expression used to calculate a Dependent cannot be updated.  If you want to
be able to change the expression, create a Cell instead. */
trait Dependent[T] extends Future[T] {
  var description: String = _;
  def context: CellContext;
  def unset: unit;
}
 
/** A modifiable can track which other modifiable values it is
dependent on during an update, if the other modifiables are accessed
during a call to update. */
trait Cell[T] extends Dependent[T] {
  def update(t: => T): unit;
}
 
/** A CellContext can track the dependencies between modifiable
objects it manufactures.  */
class CellContext {
  
  private val tlocal = new ThreadLocal();
 
  /** Create a cell that uses a fixed formula to calculate its value.  
  The formula cannot be changed once set.  */
  def formula[T](t: => T): Dependent[T] = new MCell[T] {
    def cellType = "Formula";
    override def get: T = synchronized {
      if (!isSet) 
          compute;
      super.get
    }
    def doSet = super.set(t);
    override def set(t: T): unit = error("Cannot call set on Cells.  Use update instead.");
  }
  
  /** Create a modifiable cell that will track its dependencies on other
      modifiable values.  If the update function is used to calculate the
      contents AND all calculation involving modifiables is contained 
      within the expression, dependencies will be automatically managed.  */
  implicit def cell[T](initial: => T): Cell[T] = new MCell[T] with Cell[T] {
    var expression = new Function0[T] {
      def apply() = initial;
    }
    def cellType = "Cell";
    override def get: T = synchronized {
      if (!isSet) 
          compute;
      super.get
    }
    def doSet = super.set(expression());
    def update(t: => T): unit = {
      expression = new Function0[T] {
        def apply() = t;
      } ;
      compute;
    }
    override def set(t: T): unit = error("Cannot call set on cell.  Use update instead.");
  }
  
  private def localStack = synchronized {
    var stack = tlocal.get().asInstanceOf[java.util.Stack];
    if (stack == null) {
      stack = new java.util.Stack;
      tlocal.set(stack);
    }
    stack;
  }
    
  abstract private class MCell[T] extends ExceptionSyncVar[T] with Dependent[T] {
    
    private var inputs: java.util.IdentityHashMap = _;
    private var outputs: java.util.WeakHashMap = _; 
    
    def context = CellContext.this;
    
    protected def cellType: String;
    
    def inputCount = if (inputs != null) inputs.size() else 0;
    def outputCount = if (outputs != null) outputs.size() else 0;
    
    override def toString() = {
      val sb = new StringBuffer();
      sb.append(cellType);
      sb.append('(');
      if (description != null) sb.append('"').append(description).append("\" ");
      if (inputCount > 0) sb.append(inputCount).append("->");      
      sb.append('[').append(get.toString()).append(']');
      if (outputCount > 0) sb.append("->").append(outputCount);
      sb.append(')');
      sb.toString();
    }
    
    override def get: T = {
      // If something else is calculating, connect dependencies
      var stack = tlocal.get().asInstanceOf[java.util.Stack];
      if (stack != null && !stack.isEmpty) {
        var dependent = stack.peek().asInstanceOf[MCell[Any]];
        dependent.addInput(this);
        addOutput(dependent);
      }
      
      super.get;
    }
    
    override def set(t: T) = {
      super.set(t);
      if (outputs != null) {
        val iter = outputs.keySet().iterator();
        while (iter.hasNext())
          iter.next().asInstanceOf[MCell[Any]].unset;      
      }
    }
    
    def doSet: unit;
    
    def compute = {
      clearInputs;
      localStack.push(this);
      try {
        doSet;
      } catch {
        case t: Throwable => fail(t)
      } finally {
        localStack.pop();
      }
    }
    
    private def addInput(i: AnyRef) = {
      if (inputs == null)
        inputs = new java.util.IdentityHashMap;
      inputs.put(i, Nil);
    }
    
    private def removeInput(i: AnyRef) = {
      if (inputs != null) {
        if (inputs.remove(i) != null && inputs.isEmpty())
          inputs = null;
      }
    }
    
    private def addOutput(o: AnyRef) = {
      if (outputs == null)
        outputs = new java.util.WeakHashMap;
      outputs.put(o, Nil);
    }
    
    private def removeOutput(o: AnyRef) = {
      if (outputs != null)
        if (outputs.remove(o) != null && outputs.isEmpty())
          outputs = null;
    }
    
    def clearInputs = {
      if (inputs != null) {
        val iter = inputs.keySet().iterator();
        while (iter.hasNext())
          iter.next().asInstanceOf[MCell[Any]].removeOutput(this);
        inputs = null;
      }
    }
  }
}
 
object Future extends CellContext {
 
  /** Implicity convert arbitrary values of T into a Future[T]. */  
  implicit def lazyConst[T](t: T): Future[T] = Ready(t);
  
  /** Retrieve the value, which may result in forcing the value if it is not already available. */
  implicit def fromFuture[T](lz: Future[T]): T = lz();
  
  /** We want to be able to treat our lazy values like regular values when we're
  iterating over them, and so forth.  We define a kind of "reverse lifter" here
  that wraps a function on T into a function on Future[T]. */
  implicit def lazyLift[T,B](tfunc: (T) => B): (Future[T]) => B = (lt) => tfunc(lt()); 
  
  /** Directly construct a lazy value of type T. Note that using the anonymous class
  this way seems to be the simplest way to prevent execution of "f" until we really
  mean to, and still keep it around cleanly. */
  def lazy[T](f: => T): Future[T] = new Lazy[T] {
    def func: T = f;
  }
  
  /** Create a future that is failed from the start. */
  def fail[T](ex: Throwable): Future[T] = Failed(ex);
 
  /** Build a concurrent future, starting a thread to perform the computation. */  
  def spawn[T](function: => T)(implicit tg: ThreadGroup): Future[T] = new ExceptionSyncVar[T] with Runnable {
    Console.println("Group: " + tg.getName());
    val thread = new Thread(tg, this); 
    thread.start();
    
    def run() = 
      try { set(function) } catch {
        case ex: Throwable => fail(ex)
      }
    override def toString = createString("Spawn"); 
  }
  
  /** Build a promise for a future value; the promise can be fulfilled
      on this or any other thread.  A thread getting the value will block
      if the promise has not been fulfilled. */
  def promise[T]: Promise[T] = new ExceptionSyncVar[T] with Promise[T] {
    def future = this;
    override def toString = createString("Promise"); 
  }
  
 
}
 
 
/*
trait Executor {
  
  def run(f: => unit): unit;
  def run[A](f: A => unit)(a: A): unit = run( f(a) );
  def run[A,B](f: (A, B) => unit)(a: A, b: B): unit = run( f(a,b) );
  def run[A,B,C](f: (A, B, C) => unit)(a: A, b: B, c: C): unit = run( f(a,b,c) );
  
  def submit[T](f: => T): Future[T];
  def submit[A,T](f: A => T)(a: A): Future[T] = submit( f(a) );
  
  def invokeAll[T](tasks: Function0[T]*): Seq[Future[T]];
  def invokeAny[T](tasks: Function0[T]*): T;
  
  def isShutdown: boolean;
  def isTerminated: boolean;
  def shutdown: unit;
  def shutdownNow: unit;
  
}
 
object Executors {
      
}
*/
 
private class ExceptionSyncVar[T] extends Future[T] {
  
  private var ex: Throwable = _;
  private var isDefined: Boolean = false;
  private var value: T = _;
  private var empty: T = _;
  
  def failure = ex;
  override def apply() = get;
  def isFailure = isDefined && (ex != null);
 
  def unset = synchronized {
    ex = null;
    isDefined = false;
    value = empty;
  }
  def set(x: T) = synchronized {
    value = x
    isDefined = true
    notifyAll()
  }
  def fail(e: Throwable): unit = synchronized {
    ex = e
    isDefined = true
    notifyAll()
  }
 
  def isSet: Boolean = synchronized {
    isDefined
  }
 
  def get = synchronized {
    if (!isDefined) wait();
    if (ex != null) throw ex;
    value
  }
  
  override def toString = createString("ESync"); 
    
  def createString(head: String) = 
    if (!isDefined) head + "(?)"
    else if (ex != null) head + '(' + ex + ')'
    else head + '(' + value + ')';
}
  
private abstract class Lazy[T] extends ExceptionSyncVar[T] {
  def func: T;
  override def get: T = synchronized {
    if (!isSet) 
      try {
        val ret = func;
        set(ret);
        ret
      } catch {
        case e: Throwable => fail(e)
        throw e;
      }
    else 
      super.get
  }
  override def toString() = createString("Lazy"); 
}
 
case class Failed[T](ex: Throwable) extends Future[T] {
  def apply(): T = throw ex;
  def failure = ex;
  def isFailure = true;
  def isSet = true;
}
 
case class Ready[T](v: T) extends Future[T] {
  def apply() = v;
  override def failure = null;
  def isFailure = false;
  def isSet = true;
}
 
code/futures.txt · Last modified: 2008/02/07 15:45 by 207.152.147.19
 
Recent changes RSS feed Valid XHTML 1.0 Driven by DokuWiki