The facilities provided here are modelled on the Haskell STM facilities described in
Composable memory transactions, by Tim Harris, Simon Marlow, Simon Peyton Jones, and Maurice Herlihy, in ACM Conference on Principles and Practice of Parallel Programming 2005. http://research.microsoft.com/Users/simonpj/papers/stm/index.htm
This is a proof-of-concept implementation by Eric Willigers. It makes no provision for fairness. Under sustained load, long-running transactions may never have opportunity to commit. We also haven’t provided any way to have more pending transactions than Java threads.
We support the following syntax:-
import stmlib._ atomically { // read and update TVar instances // ... { // ... if (...) retry // ... } orElse { // ... } orElse { // ... } // ... }
The transaction block might be executed multiple times, until a commit succeeds.
If a user exception is thrown, all updates are rolled back and the transaction block exits with the exception.
The updates in the atomic body either all commit, or are all rolled back. Updates are not visible to other transactions until they have been committed.
Besides reads and updates of TVar instances, the body of an atomic body should be purely functional.
orElse allow a number of different execution paths to be specified. The left-most execution path is attempted first.
retry indicates that the current execution path cannot succeed with the current transactional data. A RetryException exception is thrown and updates are rolled back. The next available execution path is attempted, or the atomic body is re-started if no other paths are available. (Note the atomic body will not be restarted until at least some of the transaction data we have read is updated by another transaction.)
stmlib follows the naming convention of jolib, pilib in scala.concurrent
object stmlib { import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} type Timestamp = Long private val clock = new java.util.concurrent.atomic.AtomicLong(0) def readCurrentTime: Timestamp = clock.get private def incrementCurrentTime(): Timestamp = clock.incrementAndGet() sealed abstract class TransactionException extends Exception {} final class NoTransactionInProgressException(val operation: String) extends TransactionException {} final class DeadlockException extends TransactionException {} final class RetryException extends TransactionException {} def retry: Nothing = throw new RetryException() def atomically[T](block: => T): T = currentTransaction.get match { // we permit redundant atomically within transaction (unlike Haskell) case existingTransaction: AbstractTransaction => block case NoTransaction => { // tail-recursive def loop(): T = { // create and execute transaction new OuterTransaction()(block) match { case Some(result) => result // successful execution case None => loop() // due to deadlock or retry } } loop() } } // Choice and toChoice implement first orElse second, // where second is evaluated iff a RetryException is thrown by first sealed trait Choice[T] { protected def first: T final def orElse(second: => T): T = currentTransaction.get match { case NoTransaction => throw new NoTransactionInProgressException("orElse") case existingTransaction: AbstractTransaction => { try { // create and execute inner transaction, // so we can retry without rolling back existingTransaction new InnerTransaction(existingTransaction)(first) } catch { // Note that existingTransaction is rolled back if another retry // occurs while executing second case _: RetryException => second } } } } implicit def toChoice[T](_first: => T): Choice[T] = { new Choice[T] { def first = _first } } protected[stmlib] val currentTransaction = new java.lang.ThreadLocal[PossibleTransaction]() { override def initialValue: PossibleTransaction = NoTransaction override def get: PossibleTransaction = super.get.asInstanceOf[PossibleTransaction] } protected[stmlib] sealed abstract class PossibleTransaction {} protected[stmlib] final object NoTransaction extends PossibleTransaction {} protected[stmlib] sealed abstract class AbstractTransaction extends PossibleTransaction { // newly created for each transaction context including inner transaction contexts, // so updates can easily be rolled back protected final val writeLog = new HashSet[TVar] private val valueCache = new HashMap[TVar, Any] // only called when a value is not already in the cache protected def readExisting(cell: TVar): Any final def read(cell: TVar): Any = { valueCache.getOrElseUpdate(cell, readExisting(cell)) } final def write(cell: TVar, newValue: Any): Unit = { valueCache.update(cell, newValue) writeLog += cell } } protected[stmlib] val commitLock = new AnyRef() protected[stmlib] val watcherLock = new AnyRef() // corresponds to an outermost atomically protected[stmlib] final class OuterTransaction() extends AbstractTransaction { private val readLog = new ArrayBuffer[TVar] val startTime = readCurrentTime protected def readExisting(cell: TVar): Any = { readLog += cell val result = cell.committedValue if (cell.invalidated(startTime)) { commitLock.synchronized {} // the transaction that updated cell has committed throw new DeadlockException() } result } def apply[T](block: => T): Option[T] = { currentTransaction.set(this) try { val result: T = block commitLock.synchronized { if (readLog.exists(_.invalidated(startTime))) throw new DeadlockException() // From here on we never throw an exception // All cells in readLog were last updated before startTime val commitTime = readCurrentTime // commit writes writeLog.foreach( cell => cell.commit(commitTime, read(cell).asInstanceOf[cell.valueType]) ) // If others transactions that have already started read cells in writeLog, // they won't commit - they will be rolled back with a DeadlockException and executed again incrementCurrentTime() } watcherLock.synchronized { val allWatchers = new HashSet[OuterTransaction] writeLog.foreach( allWatchers ++= _.getWatchers ) def process(watcher: OuterTransaction): Unit = { watcher.synchronized { watcher.notify() } watcher.readLog.foreach( _.removeWatcher(watcher) ) } allWatchers.foreach(process _) } Some(result) } catch { case _: DeadlockException => None case _: RetryException => { synchronized { // We only claim watcherLock while accessing cells in readLog, not while waiting watcherLock.synchronized { if (readLog.exists(_.invalidated(startTime))) { // A cell we read has already been updated by another transaction. // We can retry immediately None } else { // We must wait until a cell in readLog has been updated by another transaction readLog.foreach(_.addWatcher(this)) Some(this) } }.foreach(_.wait()) } None } } finally { currentTransaction.set(NoTransaction) } } } // represents the first alternative in an orElse protected[stmlib] final class InnerTransaction(parent: AbstractTransaction) extends AbstractTransaction { protected def readExisting(cell: TVar): Any = { parent.read(cell) } // executes the inner transaction within an existing transaction def apply[T](block: => T): T = { currentTransaction.set(this) try { val result: T = block // promote writes writeLog.foreach { cell => parent.write(cell, read(cell)) } result } finally { currentTransaction.set(parent) } } } sealed abstract class TVar { type valueType def apply(): valueType = currentTransaction.get match { case NoTransaction => throw new NoTransactionInProgressException("read") case existingTransaction: AbstractTransaction => existingTransaction.read(this).asInstanceOf[valueType] } def update(newValue: valueType): Unit = currentTransaction.get match { case NoTransaction => throw new NoTransactionInProgressException("write") case existingTransaction: AbstractTransaction => existingTransaction.write(this, newValue) } // committedValue might be read at the same time as another transaction is being committed @volatile protected[stmlib] var committedValue: valueType protected[stmlib] var commitTime: Option[Timestamp] // has the cell been updated since startTime? protected[stmlib] def invalidated(startTime: Timestamp): Boolean = { commitTime.exists(_ - startTime >= 0) } protected[stmlib] def commit(time: Timestamp, value: valueType) = { commitTime = Some(time) committedValue = value } private val watchers = new HashSet[OuterTransaction] // clients must claim watcherLock before using the following protected[stmlib] def addWatcher(watcher: OuterTransaction): Unit = { watchers += watcher } protected[stmlib] def removeWatcher(watcher: OuterTransaction): Unit = { watchers -= watcher } protected[stmlib] def getWatchers: Iterable[OuterTransaction] = watchers } object TVar { // Constructs a TVar. May be called inside or outside a transaction def apply[_valueType](initialValue: _valueType) = new TVar { type valueType = _valueType @volatile var committedValue = initialValue var commitTime: Option[Timestamp] = None } } }