tkaymak opened a new pull request, #37886: URL: https://github.com/apache/beam/pull/37886
Samza's `SdkHarnessDoFnRunner` called `getBundle()` without a `BundleCheckpointHandler`, causing an `UnsupportedOperationException` (issue #26126) when `KafkaSourceConsumerFn` (an SDF) returned `ProcessContinuation.resume()`. Three changes: 1. `hasSDF()` + `createBundleCheckpointHandler()` — detects whether the executable stage contains an SDF (URN = `SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN`) and if so, creates a `StateAndTimerBundleCheckpointHandler` that stores SDF residuals in Samza's persistent state/timer internals (using a null-key non-keyed factory, matching how the stage is already run non-keyed). 2. `startBundle()` — passes the `bundleCheckpointHandler` to `stageBundleFactory.getBundle(...)` so checkpoint responses from the SDK harness are properly handled instead of throwing. 3. `onTimer()` — SDF checkpoint timers (IDs prefixed with `sdf_checkpoint:`) are now intercepted. Instead of trying to route them to a remote bundle timer receiver (which would fail), the runner reads the stored residual from state and re-feeds it via `processElement()` for reprocessing. This allows to remove the `TestDebeziumIO_BasicRead` filter from `samzaFilters` and re-enable the test against the Samza runner. ------------------------ GitHub Actions Tests Status (on master branch) ------------------------------------------------------------------------------------------------ [](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
