// A prototype implementation of the subsystems mechanism for provably safe exception handling // Bart Jacobs and Frank Piessens , 2008 // http://www.cs.kuleuven.be/~bartj/subsystems/ // Last update: 2008-03-20 package subsystems import scala.util.DynamicVariable import scala.ref._ private object Util { def or[T](x: T, y: => T): T = if (x == null) y else x } import Util._ /** * See method doTry of object Subsystem. */ abstract class TryBlock[T] { /** * See method doTry of object Subsystem. */ def doCatch(body: Throwable => T): T } /** * See method allocate of class Subsystem. */ trait Disposable { /** * See method allocate of class Subsystem. */ def dispose(): unit } private class BlockEntry { @volatile var next: BlockEntry = null def markTailAsFailed(cause: Throwable): unit = { val next = this.next if (next != null) { next.markTailAsFailed(cause) next.fail(cause) this.next = null } Thread.interrupted() // Clear interrupted flag. } def fail(cause: Throwable): unit = {} } private class SubsystemEntry(val subsystem: Subsystem) extends BlockEntry { val thread = currentThread override def fail(cause: Throwable) = subsystem.fail(cause) var isTop: boolean = false // Protected by lock of subsystem var succ: SubsystemEntry = this // Protected by lock of subsystem // succ.subsystem == subsystem var pred: SubsystemEntry = this // Protected by lock of subsystem // pred.subsystem == subsystem def add(entry: SubsystemEntry): unit = { // Caller holds lock of subsystem entry.pred = pred entry.succ = this pred.succ = entry pred = entry } def remove(): unit = { // Caller holds lock of subsystem pred.succ = succ succ.pred = pred } } private class Compensation(val subsystem: Subsystem, val action: Disposable, val next: Compensation) private class ResourceScopeEntry extends BlockEntry { // Accesses of this variable are well-synchronized: // - This variable is accessed only by a thread where this ResourceScopeEntry is current // - If there are two accesses A and B in different threads, then there must be a chain of Monitor releases and acquires leading from A to B. var compensationStack: Compensation = null val cleanupEntry = new BlockEntry override def fail(cause: Throwable): unit = { synchronized { // First deal with aborted earlier cleanup actions cleanupEntry.markTailAsFailed(null) val info = Subsystem.perThreadInfo info.blockEntry = cleanupEntry info.resourceScopeEntry = this // use this resource scope when the cleanup actions themselves allocate resources var exception: Throwable = null while (compensationStack != null) { val c = compensationStack c.subsystem.enterCore({ compensationStack = c.next c.action.dispose() }, e => exception = e) // Ideally, each Throwable would have a getOverriddenException() in addition to a getCause(). Then, here, we would set the overriddenException. } if (exception != null) throw exception } } } private class PerThreadInfo(val thread: Thread) { // All fields of this object are accessed only by thread thread. var subsystem: Subsystem = null var blockEntry: BlockEntry = null var subsystemEntry: SubsystemEntry = null var resourceScopeEntry: ResourceScopeEntry = null } /** * Subsystems-related operations. */ object Subsystem { /** * For testing this library. */ var failInWeirdPlaces: boolean = false private val _perThreadInfo = new DynamicVariable[PerThreadInfo](null) private[subsystems] def perThreadInfo: PerThreadInfo = { val info = _perThreadInfo.value if (info == null) { val info = new PerThreadInfo(currentThread) _perThreadInfo.value = info info } else info } /** * The current subsystem. */ def current = { val info = _perThreadInfo.value; if (info == null) null else info.subsystem } // prevents this thread from being interrupted during execution of body private[subsystems] def leave[T](body: => T): T = { val info = perThreadInfo val s = info.subsystem if (s == null) body else { s synchronized { Thread.interrupted() info.subsystemEntry.isTop = false } val result = body // Do this only on the normal path; if an exception occurs, there's no need to interrupt the thread anyway s synchronized { s.checkNotFailed() info.subsystemEntry.isTop = true } result } } /** * The expression (doTry tryBlock doCatch catchBlock) acts like (try tryBlock catch catchBlock), * except that if when tryBlock completes, the current subsystem has failed, the doTry-doCatch expression * completes abruptly with a SubsystemException. Also, tryBlock is evaluated in a fresh child subsystem of * the current subsystem. * Note that a doTry-doCatch expression catches all exceptions. For example, if tryBlock completes abruptly * with an exception E and catchBlock is a pattern matching expression that is not defined at E, then * the doTry-doCatch expression completes abruptly with a MatchError. */ def doTry[T](tryBody: => T): TryBlock[T] = { new TryBlock[T] { def doCatch(catchBody: Throwable => T): T = { var ex: Throwable = null var result: Option[T] = None val info = Subsystem.perThreadInfo val currentSubsystem = info.subsystem val currentBlockEntry = info.blockEntry val currentSubsystemEntry = info.subsystemEntry val currentResourceScopeEntry = info.resourceScopeEntry Subsystem.leave { (new Subsystem).enterCore({ result = Some(tryBody) }, e => ex = e ) } info.subsystem = currentSubsystem info.blockEntry = currentBlockEntry info.subsystemEntry = currentSubsystemEntry info.resourceScopeEntry = currentResourceScopeEntry if (ex != null) catchBody(ex) else result.get } } } /** * Creates a new child subsystem of the current subsystem. Then starts a new thread * which enters the new subsystem and executes body in it. */ def fork(body: => unit): unit = { val s = new Subsystem new Thread() { override def run(): unit = { Subsystem._perThreadInfo.value = null s enter { body } } }.start() } /** * Creates a new compensation stack and executes body with the * new stack as the current compensation stack. When body completes * (normally or abruptly), the compensation stack's compensation * actions are popped and their dispose methods are called in the subsystems that registered them, * in LIFO order. * If an action completes abruptly, the corresponding subsystem * is marked as failed as per method fail. * All actions are executed, even if some of them complete abruptly. * If all actions complete normally, the rscope expression completes * in the same way as body. If any action completes abruptly, the * rscope expression completes in the same way as the last action * executed that completed abruptly. */ def rscope[T](body: => T): T = { val info = Subsystem.perThreadInfo val savedBlockEntry = info.blockEntry val currentBlockEntry = or(savedBlockEntry, new BlockEntry) val currentResourceScopeEntry = info.resourceScopeEntry val entry = new ResourceScopeEntry currentBlockEntry.next = entry info.blockEntry = entry info.resourceScopeEntry = entry try { try { val result = body Subsystem.leave { currentBlockEntry.markTailAsFailed(null) } result } catch {case e => currentBlockEntry.markTailAsFailed(e) throw e } } finally { info.blockEntry = savedBlockEntry info.resourceScopeEntry = currentResourceScopeEntry } } } private class SoftList[T <: AnyRef](val items: List[SoftReference[T]]) { def this() = this(Nil) def foreach(body: T => unit): unit = { for (r <- items) { r.get match { case None => () case Some(o) => body(o) } } } def ::(x: T): SoftList[T] = new SoftList[T](new SoftReference(x) :: items.filter(r => r.get != None)) } /** * This exception is thrown when the current thread attempts to enter a subsystem that has failed, * or when it attempts to create a child subsystem of a subsystem that has failed. */ class SubsystemException(message: String, cause: Throwable) extends RuntimeException(message, cause) /** * An object of class Subsystem reflects a subsystem. * Subsystems can be used to write programs that preserve safety * properties even in the presence of asynchronous exceptions * and other unchecked exceptions. * Specifically, if a computation B depends on a computation A, * run A in some subsystem S and run B in a descendant subsystem * of S to ensure that either A completes normally or B is * prevented from running. The subsystems mechanism ensures that * if A completes abruptly, subsystem S and its descendants are * marked as failed. As a result, an attempt to enter a * descendant of S to execute B will throw a * SubsystemException. */ class Subsystem(val parent: Subsystem) { private var children = new SoftList[Subsystem] // Protected by this Subsystem's lock @volatile private var failed: boolean = false @volatile private var exception: Throwable = null private val entries = new SubsystemEntry(null) // sentinel element // Protected by this Subsystem's lock /** * Returns true if and only if this subsystem has failed. * However, this method might not detect asynchronous failures in * other threads. */ def hasFailed: boolean = failed /** * Returns the exception that caused this subsystem to fail. * If this subsystem has not failed, or if the cause could not be * recorded due to failures within the subsystems implementation, * this method returns null. * To test if this subsystem has failed, use hasFailed instead of * this method. */ def causeOfFailure: Throwable = exception { if (parent != null) { parent synchronized { if (parent.failed) error("The program attempted to create a child subsystem of a failed subsystem."); parent.children = this :: parent.children } } } /** * Creates a child subsystem of the current subsystem. */ def this() = this(Subsystem.current) /** * Marks this subsystem as failed with cause cause. * Also marks all descendant subsystems as failed. * Attempts to send an interrupt signal (see Thread.interrupt) * to each thread that is executing in a descendant * subsystem of this subsystem. * This attempt might fail in case an exception * occurs within the subsystems implementation. * Therefore, to ensure liveness, in threads that * wait for other threads, use asyncCheckNotFailed. */ def fail(cause: Throwable): unit = synchronized { if (!failed) { if (exception == null) exception = cause for (child <- children) child.fail(cause) val ct = currentThread var entry = entries.succ while (entry != entries) { if (entry.isTop && entry.thread != ct) entry.thread.interrupt() entry = entry.succ } failed = true // Must happen last. } } /** * If this subsystem has failed, throws a SubsystemException. * Otherwise, does nothing. * Is not guaranteed to detect asynchronous failures (i.e. * failures in other threads that did not happen-before this call) * even after a long time. */ def checkNotFailed() = if (failed) throw new SubsystemException("The program attempted to enter a failed subsystem.", exception) /** * If this subsystem has failed, throws a SubsystemException. * Otherwise, does nothing. * This method has much worse performance than method checkNotFailed, but it is guaranteed to eventually detect even asynchronous failures (i.e. * failures in other threads that did not happen-before this call). * Use this method to ensure liveness of threads that wait for signals from other threads. */ def asyncCheckNotFailed(): unit = { if (parent != null) parent.asyncCheckNotFailed() synchronized { checkNotFailed() var entry = entries.succ while (entry != entries) { if (!entry.thread.isAlive()) { fail(null) checkNotFailed() } entry = entry.succ } } } /** * Enters this subsystem, executes body in it, and then leaves this subsystem, entering back into the current subsystem. * If body completes abruptly, this subsystem is marked as failed, as per method fail, and the enter expression * completes abruptly with the same exception. */ def enter[T](body: => T): T = Subsystem.leave { enterCore(body, e => throw e) } /** Runs body in this subsystem. If an exception occurs in body, this subsystem is marked as failed but unlike enter, the reenter expression completes normally. * Note that the exception is not lost since it is recorded as the cause of failure of the subsystem. * It's up to the user to retrieve and report the cause of failure in the appropriate places. */ def reenter[T](body: => unit): unit = Subsystem.leave { enterCore(body, e => {}) } // Use enterCore instead of an enclosing try-catch block if you want to be able to // assume in the catchBlock that the subsystem has been marked as failed (except perhaps if body completed successfully). private[subsystems] def enterCore[T](body: => T, catchBlock: Throwable => T): T = { val info = Subsystem.perThreadInfo val ct = info.thread val savedBlockEntry = info.blockEntry val currentBlockEntry = or(savedBlockEntry, new BlockEntry) val currentSubsystemEntry = info.subsystemEntry val caller = info.subsystem val entry = new SubsystemEntry(this) currentBlockEntry.next = entry // It's crucial that this happens before the try block. synchronized { checkNotFailed(); entry.isTop = true; entries.add(entry) } try { info.blockEntry = entry info.subsystemEntry = entry info.subsystem = this try { val result = body synchronized { Thread.interrupted() // Clear interrupted flag. Must occur inside this synchronized block, together with remove call entry.remove() } currentBlockEntry.next = null result } finally { info.blockEntry = savedBlockEntry info.subsystemEntry = currentSubsystemEntry info.subsystem = caller } } catch { case e => if (Subsystem.failInWeirdPlaces) throw new RuntimeException("Weird failure.", e) currentBlockEntry.markTailAsFailed(e) catchBlock(e) } } /** * Expression (s allocate body) is syntactic sugar for * (s enter { val a = body; Subsystem.currentCompensationStack.push(a); a }) * Wrap a resource allocation with this method to ensure that either the resource's * dispose method is called at the end of the innermost enclosing rscope statement, * or this subsystem is marked as failed. */ def allocate[T <: Disposable](body: => T): T = { enter { val action = body // evaluate body val info = Subsystem.perThreadInfo info.resourceScopeEntry.compensationStack = new Compensation(this, action, info.resourceScopeEntry.compensationStack) action } } } /** * A safe replacement for synchronized expressions. * Use this synchronization construct instead of synchronized expressions to ensure that a subsystem failure in a thread that holds * a lock is visible to all threads that subsequently acquire the lock. * The use of synchronized expressions is also safe, unless an exception occurs within the subsystems implementation. This is * highly unlikely but cannot be ruled out based on the Java Virtual Machine Specification. */ class Monitor { private var entry: BlockEntry = null // Protected by this Monitor's lock /** * (m enter body) behaves like (m synchronized body), except that the use of this method ensures that a subsystem failure * in a thread that holds a lock is visible to all threads that subsequently acquire the lock. */ def enter[T](body: => T): T = if (Thread.holdsLock(this)) body else enterCore(body) private def enterCore[T](body: => T): T = synchronized { val info = Subsystem.perThreadInfo val savedBlockEntry = info.blockEntry val currentBlockEntry = or(savedBlockEntry, new BlockEntry) currentBlockEntry.next = entry if (entry != null) Subsystem.leave { entry.markTailAsFailed(null) } entry = new BlockEntry currentBlockEntry.next = entry info.blockEntry = entry try { val result = body currentBlockEntry.next = null entry = null result } finally { info.blockEntry = savedBlockEntry } } } /** * Utility trait that binds the current subsystem to its instances when they are created. */ trait SubsystemBoundObject { /** * The subsystem that was current when this object was created. */ val subsystem = Subsystem.current /** * Syntactic sugar for (this.subsystem enter body). */ def inSubsystem[T](body: => T): T = subsystem enter body /** * Syntactic sugar for (this.subsystem allocate body). */ def allocate[T <: Disposable](body: => T): T = subsystem allocate body } /** * Collects members that are typically imported by client code. */ object SubsystemsPredef { /** * Syntactic sugar for Subsystem.doTry(body). */ def doTry[T](body: => T): TryBlock[T] = Subsystem.doTry(body) /** * Syntactic sugar for Subsystem.fork(body). */ def fork(body: => unit): unit = Subsystem.fork(body) /** * Syntactic sugar for Subsystem.rscope(body). */ def rscope[T](body: => T): T = Subsystem.rscope(body) }