Software Transactional Memory

The facilities provided here are modelled on the Haskell STM facilities described in

Composable memory transactions, by Tim Harris, Simon Marlow, Simon Peyton Jones, and Maurice Herlihy, in ACM Conference on Principles and Practice of Parallel Programming 2005. http://research.microsoft.com/Users/simonpj/papers/stm/index.htm

This is a proof-of-concept implementation by Eric Willigers. It makes no provision for fairness. Under sustained load, long-running transactions may never have opportunity to commit. We also haven’t provided any way to have more pending transactions than Java threads.

We support the following syntax:-

import stmlib._
 
atomically
{
    // read and update TVar instances
 
    // ...
 
    {
        // ...
 
        if (...)
            retry
 
        // ...
 
    } orElse {
        // ...
 
    } orElse {
        // ...
    }
 
    // ...
}

The transaction block might be executed multiple times, until a commit succeeds.

If a user exception is thrown, all updates are rolled back and the transaction block exits with the exception.

The updates in the atomic body either all commit, or are all rolled back. Updates are not visible to other transactions until they have been committed.

Besides reads and updates of TVar instances, the body of an atomic expression should be purely functional.

orElse allow a number of different execution paths to be specified. The left-most execution path is attempted first.

retry indicates that the current execution path cannot succeed with the current transactional data. A RetryException exception is thrown and updates are rolled back. The next available execution path is attempted, or the atomic body is re-started if no other paths are available. (Note the atomic body will not be restarted until at least some of the transaction data we have read is updated by another transaction.)

stmlib follows the naming convention of jolib, pilib in scala.concurrent

object stmlib
{
    import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
    type Timestamp = Long
 
    private val clock = new java.util.concurrent.atomic.AtomicLong(0)
    
    def readCurrentTime: Timestamp = clock.get
 
    private def incrementCurrentTime(): Timestamp = clock.incrementAndGet()
 
 
    sealed abstract class TransactionException extends Exception {}
 
    final class NoTransactionInProgressException(val operation: String) extends TransactionException {}
 
    final class DeadlockException extends TransactionException {}
 
    final class RetryException extends TransactionException {}
 
    def retry: Nothing = throw new RetryException()
 
 
    def atomically[T](block: => T): T = currentTransaction.get match
    {
        // we permit redundant atomically within transaction (unlike Haskell)
        case existingTransaction: AbstractTransaction => block
 
        case NoTransaction =>
        {
            // tail-recursive
            def loop(): T =
            {
                // create and execute transaction
                new OuterTransaction()(block) match
                {
                    case Some(result) => result // successful execution
                    case None => loop() // due to deadlock or retry
                }
            }
 
            loop()
        }
    }
 
    // Choice and toChoice implement  first orElse second,
    // where second is evaluated iff a RetryException is thrown by first
    
    sealed trait Choice[T]
    {
        protected def first: T
    
        final def orElse(second: => T): T = currentTransaction.get match
        {
            case NoTransaction => throw new NoTransactionInProgressException("orElse")
 
            case existingTransaction: AbstractTransaction =>
            {
                try
                {
                    // create and execute inner transaction,
                    // so we can retry without rolling back existingTransaction
                    new InnerTransaction(existingTransaction)(first)
                }
                catch
                {
                    // Note that existingTransaction is rolled back if another retry
                    // occurs while executing second
                    case _: RetryException => second
                }
            }
        }
    }
 
    implicit def toChoice[T](_first: => T): Choice[T] =
    {
        new Choice[T] {
            def first = _first
        }
    }
 
    protected[stmlib] val currentTransaction = new java.lang.ThreadLocal[PossibleTransaction]() {
        override def initialValue: PossibleTransaction = NoTransaction
        override def get: PossibleTransaction = super.get.asInstanceOf[PossibleTransaction]
    }
 
    protected[stmlib] sealed abstract class PossibleTransaction {}
    
    protected[stmlib] final object NoTransaction extends PossibleTransaction {}
 
    protected[stmlib] sealed abstract class AbstractTransaction extends PossibleTransaction
    {
        // newly created for each transaction context including inner transaction contexts,
        // so updates can easily be rolled back
        protected final val writeLog = new HashSet[TVar]
        private val valueCache = new HashMap[TVar, Any]
        
        // only called when a value is not already in the cache
        protected def readExisting(cell: TVar): Any
        
        final def read(cell: TVar): Any =
        {
            valueCache.getOrElseUpdate(cell, readExisting(cell))
        }
 
        final def write(cell: TVar, newValue: Any): Unit =
        {
            valueCache.update(cell, newValue)
            writeLog += cell
        }
    }
 
    protected[stmlib] val commitLock = new AnyRef()
    protected[stmlib] val watcherLock = new AnyRef()
 
    // corresponds to an outermost atomically
    protected[stmlib] final class OuterTransaction() extends AbstractTransaction
    {
        private val readLog = new ArrayBuffer[TVar]
        val startTime = readCurrentTime
        
        protected def readExisting(cell: TVar): Any =
        {
            readLog += cell
            val result = cell.committedValue
 
            if (cell.invalidated(startTime))
            {
                commitLock.synchronized {}
 
                // the transaction that updated cell has committed
 
                throw new DeadlockException()
            }
                
            result
        }
 
        def apply[T](block: => T): Option[T] =
        {
            currentTransaction.set(this)
            
            try
            {
                val result: T = block
 
                commitLock.synchronized
                {
                    if (readLog.exists(_.invalidated(startTime)))
                        throw new DeadlockException()
 
                    // From here on we never throw an exception
                    // All cells in readLog were last updated before startTime
 
                    val commitTime = readCurrentTime
 
                    // commit writes
                    writeLog.foreach( cell => cell.commit(commitTime, read(cell).asInstanceOf[cell.valueType]) )
 
                    // If others transactions that have already started read cells in writeLog,
                    // they won't commit - they will be rolled back with a DeadlockException and executed again
 
                    incrementCurrentTime()
                }
                
                watcherLock.synchronized
                {
                    val allWatchers = new HashSet[OuterTransaction]
                    writeLog.foreach( allWatchers ++= _.getWatchers )
                    
                    def process(watcher: OuterTransaction): Unit =
                    {
                        watcher.synchronized { watcher.notify() }
                        watcher.readLog.foreach( _.removeWatcher(watcher) )
                    }
                    
                    allWatchers.foreach(process _)
                }
 
                Some(result)
            }
            catch
            {
                case _: DeadlockException => None
 
                case _: RetryException =>
                {
                    synchronized
                    {
                        // We only claim watcherLock while accessing cells in readLog, not while waiting
                        watcherLock.synchronized
                        {
                            if (readLog.exists(_.invalidated(startTime)))
                            {
                                // A cell we read has already been updated by another transaction.
                                // We can retry immediately
                                None
                            }
                            else
                            {
                                // We must wait until a cell in readLog has been updated by another transaction
                                readLog.foreach(_.addWatcher(this))
                                Some(this)
                            }
                        }.foreach(_.wait())
                    }
                    
                    None
                }
            }
            finally
            {
                currentTransaction.set(NoTransaction)
            }
        }
    }
 
    // represents the first alternative in an orElse
    protected[stmlib] final class InnerTransaction(parent: AbstractTransaction) extends AbstractTransaction
    {
        protected def readExisting(cell: TVar): Any =
        {
            parent.read(cell)
        }
 
        // executes the inner transaction within an existing transaction
        def apply[T](block: => T): T =
        {
            currentTransaction.set(this)
            try
            {
                val result: T = block
 
                // promote writes
                writeLog.foreach { cell => parent.write(cell, read(cell)) }
                
                result
            }
            finally
            {
                currentTransaction.set(parent)
            }
        }
    }
    
    
    sealed abstract class TVar
    {
        type valueType
        
        def apply(): valueType = currentTransaction.get match
        {
            case NoTransaction => throw new NoTransactionInProgressException("read")
 
            case existingTransaction: AbstractTransaction => existingTransaction.read(this).asInstanceOf[valueType]
        }
 
        def update(newValue: valueType): Unit = currentTransaction.get match
        {
            case NoTransaction => throw new NoTransactionInProgressException("write")
 
            case existingTransaction: AbstractTransaction => existingTransaction.write(this, newValue)
        }
 
        
        // committedValue might be read at the same time as another transaction is being committed
        @volatile protected[stmlib] var committedValue: valueType
        protected[stmlib] var commitTime: Option[Timestamp]
 
        // has the cell been updated since startTime?        
        protected[stmlib] def invalidated(startTime: Timestamp): Boolean =
        {
            commitTime.exists(_ - startTime >= 0)
        }
        
        protected[stmlib] def commit(time: Timestamp, value: valueType) =
        {
            commitTime = Some(time)
            committedValue = value
        }
 
        private val watchers = new HashSet[OuterTransaction]
 
        // clients must claim watcherLock before using the following
        
        protected[stmlib] def addWatcher(watcher: OuterTransaction): Unit =
        {
            watchers += watcher
        }
 
        protected[stmlib] def removeWatcher(watcher: OuterTransaction): Unit =
        {
            watchers -= watcher
        }
        
        protected[stmlib] def getWatchers: Iterable[OuterTransaction] = watchers
    }
    
    object TVar
    {
        // Constructs a TVar. May be called inside or outside a transaction
        def apply[_valueType](initialValue: _valueType) = new TVar {
            type valueType = _valueType
            @volatile var committedValue = initialValue
            var commitTime: Option[Timestamp] = None
        }
    }
}
 
code/software-transactional-memory.txt · Last modified: 2010/02/11 09:10
 
Recent changes RSS feed Valid XHTML 1.0 Driven by DokuWiki