Motivation
Per the stream error handling docs:
The operators that support supervision strategies are explicitly documented to do so, if there is nothing in the documentation of an operator saying that it adheres to the supervision strategy it means it fails rather than applies supervision.
Several stream operators accept user-provided functions that can throw exceptions, yet do not consult the SupervisionStrategy decider. This means that if the user function throws, the stream fails unconditionally, regardless of the configured supervision strategy. For consistency and usability, all operators that apply user functions to stream elements should support supervisor strategy.
Related: #3101 (Throttle-specific issue)
Operators missing supervisor strategy support
The following operators accept user-provided functions but do not implement supervision. Each entry includes the function parameter, the source location, and the specific code path where the function is called without try-catch.
1. throttle (with costCalculation)
2. expand
- Function:
expander: Out => Iterator[U]
- Implementation:
Ops.scala:1210 — extrapolate(grab(in)) in onPush() without try-catch
- Doc: Explicitly states "Expand does not support
Supervision.Restart and Supervision.Resume. Exceptions from the seed function will complete the stream with failure."
- Impact: Should be enhanced to support supervision, or the limitation should remain clearly documented if semantics prevent it
3. groupedWeighted
- Function:
costFn: Out => Long
- Implementation:
Ops.scala:816 — costFn(elem) in GroupedWeighted.onPush() without try-catch
- Impact: Negative cost is handled, but a throwing
costFn is not supervised
4. groupedWeightedWithin
- Function:
costFn: Out => Long
- Implementation:
Ops.scala:1797 — costFn(elem) in GroupedWeightedWithin.nextElement() without try-catch
- Impact: Same as
groupedWeighted
5. groupedAdjacentBy / groupedAdjacentByWeighted
- Functions:
f: Out => T and costFn: Out => Long
- Implementation:
GroupedAdjacentByWeighted.scala:60,67 — both costFn(elem) and f(elem) called without try-catch
- Impact: Neither the key function nor the cost function is supervised
6. aggregateWithBoundary
- Functions:
allocate: () => Agg, aggregate: (Agg, In) => (Agg, Boolean), harvest: Agg => Out, emitOnTimer: Option[(Agg => Boolean, FiniteDuration)]
- Implementation:
AggregateWithBoundary.scala — all user functions called without try-catch
- Impact: Any exception in the aggregation, harvest, or timer functions fails the stream unconditionally
7. doOnFirst
- Function:
f: Out => Unit
- Implementation:
DoOnFirst.scala:48 — f(elem) called without try-catch
- Impact: Side-effect function failure fails the stream
8. delay (with custom DelayStrategy)
- Function:
DelayStrategy.nextDelay(element) (via delayStrategySupplier)
- Implementation:
Ops.scala:1994 — delayStrategy.nextDelay(element) in grabAndPull() without try-catch
- Impact: If the delay strategy throws for a particular element, the stream fails
9. zipWith / zipWithN
- Function:
combine / zipper function
- Implementation:
ZipWithApply.scala.template:51 — zipper(...) in pushAll() without try-catch; Graph.scala:1241 — same pattern in ZipWithN
- Impact: Combine function exceptions fail the stream unconditionally
10. batch (partial gap — costFn only)
- Function:
costFn: In => Long
- Implementation:
Ops.scala:1108 — costFn(elem) called outside the try-catch block in onPush(), while seed and aggregate calls are properly supervised
- Impact:
seed and aggregate are supervised, but costFn is not — inconsistent within the same operator
Operators that already support supervision (for reference)
The following operators correctly implement supervision: map, filter, filterNot, mapOption, takeWhile, dropWhile, collect, collectWhile, collectFirst, scan, scanAsync, fold, foldAsync, reduce, limitWeighted, batch (seed/aggregate only), mapConcat, statefulMapConcat, statefulMap, mapAsync, mapAsyncUnordered, mapAsyncPartitioned, log, logWithMarker, conflate/conflateWithSeed, batchWeighted, groupBy, splitWhen/splitAfter, partition, dropRepeated, mapWithResource, unfoldResource, unfoldResourceAsync, iteratorSource, iterableSource, iterableConcat
Proposed approach
For each operator listed above:
- Add
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider in createLogic
- Wrap user function calls in try-catch with
NonFatal matching
- Consult the decider on exception:
Stop → failStage(ex) (current behavior)
Resume → skip the element, continue processing (e.g. pull(in))
Restart → reset any accumulated state, then continue
- Mix in
StageLogging where needed for error logging on resume/restart
- Add tests verifying all three supervision directives work correctly
- Update scaladoc to include "Adheres to the
[[ActorAttributes.SupervisionStrategy]] attribute."
- For
expand: if supervision semantics are genuinely incompatible (e.g. the extrapolation iterator state cannot be meaningfully reset), keep the explicit documentation but consider supporting at least Resume
Notes on semantics
Some operators may need special consideration:
expand: The extrapolation iterator is stateful; Restart would need to reset it. Resume semantics are unclear when the expander itself fails mid-iteration.
zipWith/zipWithN: When the combine function fails, the elements from all upstreams have already been grabbed. On Resume, those elements would be lost, which could be surprising but is consistent with how map handles it.
delay: The delay strategy is created once at materialization. Restart semantics for a stateful DelayStrategy would need to re-create it, which may not be feasible without a factory.
aggregateWithBoundary: With 4 user functions, each may need different supervision semantics. Consider whether all should be supervised uniformly.
- Source operators (
unfold, unfoldAsync): Supervision for sources is less clear since there's no upstream element to "skip". These are lower priority.
References
Motivation
Per the stream error handling docs:
Several stream operators accept user-provided functions that can throw exceptions, yet do not consult the
SupervisionStrategydecider. This means that if the user function throws, the stream fails unconditionally, regardless of the configured supervision strategy. For consistency and usability, all operators that apply user functions to stream elements should support supervisor strategy.Related: #3101 (Throttle-specific issue)
Operators missing supervisor strategy support
The following operators accept user-provided functions but do not implement supervision. Each entry includes the function parameter, the source location, and the specific code path where the function is called without try-catch.
1.
throttle(withcostCalculation)costCalculation: Out => IntThrottle.scala:73—costCalculation(elem)inonPush()without try-catchcostCalculationthrows (e.g. for malformed input), the stream fails even withSupervision.Resume2.
expandexpander: Out => Iterator[U]Ops.scala:1210—extrapolate(grab(in))inonPush()without try-catchSupervision.RestartandSupervision.Resume. Exceptions from theseedfunction will complete the stream with failure."3.
groupedWeightedcostFn: Out => LongOps.scala:816—costFn(elem)inGroupedWeighted.onPush()without try-catchcostFnis not supervised4.
groupedWeightedWithincostFn: Out => LongOps.scala:1797—costFn(elem)inGroupedWeightedWithin.nextElement()without try-catchgroupedWeighted5.
groupedAdjacentBy/groupedAdjacentByWeightedf: Out => TandcostFn: Out => LongGroupedAdjacentByWeighted.scala:60,67— bothcostFn(elem)andf(elem)called without try-catch6.
aggregateWithBoundaryallocate: () => Agg,aggregate: (Agg, In) => (Agg, Boolean),harvest: Agg => Out,emitOnTimer: Option[(Agg => Boolean, FiniteDuration)]AggregateWithBoundary.scala— all user functions called without try-catch7.
doOnFirstf: Out => UnitDoOnFirst.scala:48—f(elem)called without try-catch8.
delay(with customDelayStrategy)DelayStrategy.nextDelay(element)(viadelayStrategySupplier)Ops.scala:1994—delayStrategy.nextDelay(element)ingrabAndPull()without try-catch9.
zipWith/zipWithNcombine/zipperfunctionZipWithApply.scala.template:51—zipper(...)inpushAll()without try-catch;Graph.scala:1241— same pattern inZipWithN10.
batch(partial gap —costFnonly)costFn: In => LongOps.scala:1108—costFn(elem)called outside the try-catch block inonPush(), whileseedandaggregatecalls are properly supervisedseedandaggregateare supervised, butcostFnis not — inconsistent within the same operatorOperators that already support supervision (for reference)
The following operators correctly implement supervision:
map,filter,filterNot,mapOption,takeWhile,dropWhile,collect,collectWhile,collectFirst,scan,scanAsync,fold,foldAsync,reduce,limitWeighted,batch(seed/aggregate only),mapConcat,statefulMapConcat,statefulMap,mapAsync,mapAsyncUnordered,mapAsyncPartitioned,log,logWithMarker,conflate/conflateWithSeed,batchWeighted,groupBy,splitWhen/splitAfter,partition,dropRepeated,mapWithResource,unfoldResource,unfoldResourceAsync,iteratorSource,iterableSource,iterableConcatProposed approach
For each operator listed above:
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].deciderincreateLogicNonFatalmatchingStop→failStage(ex)(current behavior)Resume→ skip the element, continue processing (e.g.pull(in))Restart→ reset any accumulated state, then continueStageLoggingwhere needed for error logging on resume/restart[[ActorAttributes.SupervisionStrategy]]attribute."expand: if supervision semantics are genuinely incompatible (e.g. the extrapolation iterator state cannot be meaningfully reset), keep the explicit documentation but consider supporting at leastResumeNotes on semantics
Some operators may need special consideration:
expand: The extrapolation iterator is stateful;Restartwould need to reset it.Resumesemantics are unclear when the expander itself fails mid-iteration.zipWith/zipWithN: When the combine function fails, the elements from all upstreams have already been grabbed. OnResume, those elements would be lost, which could be surprising but is consistent with howmaphandles it.delay: The delay strategy is created once at materialization.Restartsemantics for a statefulDelayStrategywould need to re-create it, which may not be feasible without a factory.aggregateWithBoundary: With 4 user functions, each may need different supervision semantics. Consider whether all should be supervised uniformly.unfold,unfoldAsync): Supervision for sources is less clear since there's no upstream element to "skip". These are lower priority.References