Skip to content

Meta: Add supervisor strategy support to stream operators that accept user functions #3110

Description

@He-Pin

Motivation

Per the stream error handling docs:

The operators that support supervision strategies are explicitly documented to do so, if there is nothing in the documentation of an operator saying that it adheres to the supervision strategy it means it fails rather than applies supervision.

Several stream operators accept user-provided functions that can throw exceptions, yet do not consult the SupervisionStrategy decider. This means that if the user function throws, the stream fails unconditionally, regardless of the configured supervision strategy. For consistency and usability, all operators that apply user functions to stream elements should support supervisor strategy.

Related: #3101 (Throttle-specific issue)

Operators missing supervisor strategy support

The following operators accept user-provided functions but do not implement supervision. Each entry includes the function parameter, the source location, and the specific code path where the function is called without try-catch.

1. throttle (with costCalculation)

2. expand

  • Function: expander: Out => Iterator[U]
  • Implementation: Ops.scala:1210extrapolate(grab(in)) in onPush() without try-catch
  • Doc: Explicitly states "Expand does not support Supervision.Restart and Supervision.Resume. Exceptions from the seed function will complete the stream with failure."
  • Impact: Should be enhanced to support supervision, or the limitation should remain clearly documented if semantics prevent it

3. groupedWeighted

  • Function: costFn: Out => Long
  • Implementation: Ops.scala:816costFn(elem) in GroupedWeighted.onPush() without try-catch
  • Impact: Negative cost is handled, but a throwing costFn is not supervised

4. groupedWeightedWithin

  • Function: costFn: Out => Long
  • Implementation: Ops.scala:1797costFn(elem) in GroupedWeightedWithin.nextElement() without try-catch
  • Impact: Same as groupedWeighted

5. groupedAdjacentBy / groupedAdjacentByWeighted

  • Functions: f: Out => T and costFn: Out => Long
  • Implementation: GroupedAdjacentByWeighted.scala:60,67 — both costFn(elem) and f(elem) called without try-catch
  • Impact: Neither the key function nor the cost function is supervised

6. aggregateWithBoundary

  • Functions: allocate: () => Agg, aggregate: (Agg, In) => (Agg, Boolean), harvest: Agg => Out, emitOnTimer: Option[(Agg => Boolean, FiniteDuration)]
  • Implementation: AggregateWithBoundary.scala — all user functions called without try-catch
  • Impact: Any exception in the aggregation, harvest, or timer functions fails the stream unconditionally

7. doOnFirst

  • Function: f: Out => Unit
  • Implementation: DoOnFirst.scala:48f(elem) called without try-catch
  • Impact: Side-effect function failure fails the stream

8. delay (with custom DelayStrategy)

  • Function: DelayStrategy.nextDelay(element) (via delayStrategySupplier)
  • Implementation: Ops.scala:1994delayStrategy.nextDelay(element) in grabAndPull() without try-catch
  • Impact: If the delay strategy throws for a particular element, the stream fails

9. zipWith / zipWithN

  • Function: combine / zipper function
  • Implementation: ZipWithApply.scala.template:51zipper(...) in pushAll() without try-catch; Graph.scala:1241 — same pattern in ZipWithN
  • Impact: Combine function exceptions fail the stream unconditionally

10. batch (partial gap — costFn only)

  • Function: costFn: In => Long
  • Implementation: Ops.scala:1108costFn(elem) called outside the try-catch block in onPush(), while seed and aggregate calls are properly supervised
  • Impact: seed and aggregate are supervised, but costFn is not — inconsistent within the same operator

Operators that already support supervision (for reference)

The following operators correctly implement supervision: map, filter, filterNot, mapOption, takeWhile, dropWhile, collect, collectWhile, collectFirst, scan, scanAsync, fold, foldAsync, reduce, limitWeighted, batch (seed/aggregate only), mapConcat, statefulMapConcat, statefulMap, mapAsync, mapAsyncUnordered, mapAsyncPartitioned, log, logWithMarker, conflate/conflateWithSeed, batchWeighted, groupBy, splitWhen/splitAfter, partition, dropRepeated, mapWithResource, unfoldResource, unfoldResourceAsync, iteratorSource, iterableSource, iterableConcat

Proposed approach

For each operator listed above:

  1. Add private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider in createLogic
  2. Wrap user function calls in try-catch with NonFatal matching
  3. Consult the decider on exception:
    • StopfailStage(ex) (current behavior)
    • Resume → skip the element, continue processing (e.g. pull(in))
    • Restart → reset any accumulated state, then continue
  4. Mix in StageLogging where needed for error logging on resume/restart
  5. Add tests verifying all three supervision directives work correctly
  6. Update scaladoc to include "Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute."
  7. For expand: if supervision semantics are genuinely incompatible (e.g. the extrapolation iterator state cannot be meaningfully reset), keep the explicit documentation but consider supporting at least Resume

Notes on semantics

Some operators may need special consideration:

  • expand: The extrapolation iterator is stateful; Restart would need to reset it. Resume semantics are unclear when the expander itself fails mid-iteration.
  • zipWith/zipWithN: When the combine function fails, the elements from all upstreams have already been grabbed. On Resume, those elements would be lost, which could be surprising but is consistent with how map handles it.
  • delay: The delay strategy is created once at materialization. Restart semantics for a stateful DelayStrategy would need to re-create it, which may not be feasible without a factory.
  • aggregateWithBoundary: With 4 user functions, each may need different supervision semantics. Consider whether all should be supervised uniformly.
  • Source operators (unfold, unfoldAsync): Supervision for sources is less clear since there's no upstream element to "skip". These are lower priority.

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions