The JDK uses AbstractQueuedSynchronizer as the basis for its high performance concurrency implementations. Here I show how lazy values can be implemented by reusing much of the JDK machinery, and hopefully therefore inheriting high performance.
First, sample usage:
object LazyTest { import Lazy._ val exec = java.util.concurrent.Executors.newScheduledThreadPool(4) def main(args: Array[String]) { go } def go { import Console._ import Lazy._ def pr(name: String, n: Int) = { println("At " + name); n } val k = lazy { pr("k", 25) } val a = pr("a", 10) val b = pr("b", 20) val c = lazy { pr("c", { Thread.sleep(1000); a + b + k } ) } val d = lazy { pr("d", 35 + c) } val e = lazy { pr("e", 45 + d) } // println("e is " + e()) val x = lazy { pr("x", 55 * e) } later(1000) { println("later") val higher = 100 + x pr("later", higher) println("later done") } println("start get") pr("get", x.get) println("get done") println("All Done") exec.shutdown() } private implicit def block2Runnable(block: => Unit) = new Runnable() { def run() { block } } private def later(block: => Unit) = exec execute block private def later(millis: Long)(block: => Unit) = exec.schedule(block, millis, java.util.concurrent.TimeUnit.MILLISECONDS) }
Here’s the implementation:
import java.util.concurrent.locks.AbstractQueuedSynchronizer object Lazy { /** Create a lazy value, where the expression that forms the value will not be evaluated until the first use of the value at runtime. After the computation is performed once, subsequent accesses of the value will return the previously computed result. */ def lazy[T](x: => T) = new Lazy(x) /** As an ease-of-use measure, provide an automatic conversion from lazy values to their underlying type. This suffices in many but not all circumstances. In particular, functions that accept the Any type will not have the conversion applied. Invoke the apply method via lazyvalue() to ensure that the conversion takes place. */ implicit def lazyToT[T](lazy: Lazy[T]) = lazy() // We have two contants defined here that are used by // the implementation. private [laziness] final val RUNNING = 1 private [laziness] final val RAN = 2 } /** A class containing lazy values, encoded as a function of zero parameters, yielding them on the application of the function. If apply is called when the value hasn't been calculated, it will be calculated by calling the given function. If the value has already been calculated, the currently value will be returned. If an exception is thrown during the calculation of the value, calling apply will throw that exception. If apply is called when another thread is calculating the value, the current thread will block until the value's calculation is complete. This is interruptible; interrupting a thread that is waiting for a calculated value will release that thread and throw InterruptedException. <p> This class is based on the techniques used by the JDK to provide its FutureTask class, minus the cancellation logic. */ class Lazy[T](func: => T) extends AbstractQueuedSynchronizer with Function0[T] { import Lazy.{RAN, RUNNING} private var result: T = _ private var exception: Throwable = _ [volatile] private var runner: Thread = _ /** Retrieves the result of this lazy expression, evaluating it if necessary. After evaluation has been performed once, subsequent calls to apply will return the same result. */ def apply() = { if (!innerIsDone) innerRun get } /** Retrieves the current value of this lazy expression but does <i>not</i> force calculation, if it isn't ready. Another thread must do that. */ def get = { acquireSharedInterruptibly(0) if (exception != null) throw exception else result } private def innerSet(v: T) { while (true) { val s = getState if (((s & RAN) == 0) && compareAndSetState(s, RAN)) { result = v releaseShared(0) return } } } private def innerSetException(t: Throwable) { while (true) { val s = getState if (s != RAN && compareAndSetState(s, RAN)) { exception = t releaseShared(0) return } } } private def innerRun { if (compareAndSetState(0, RUNNING)) { try { runner = Thread.currentThread() if (getState == RUNNING) innerSet(func) else releaseShared(0) } catch { case ex: Throwable => innerSetException(ex) } } } override protected def tryAcquireShared(ignore: Int) = if (innerIsDone) 1 else -1 override protected def tryReleaseShared(ignore: Int) = { runner = null; true } private def ranOrCancelled(state: Int) = (state & (RAN /* | CANCELLED */)) != 0 private def innerIsDone = ranOrCancelled(getState) && runner == null }