dnamaz opened a new issue, #37035:
URL: https://github.com/apache/beam/issues/37035

   ### What happened?
   
   # GitHub Issue: Flink Runner - No translator known for 
PrimitiveUnboundedRead after SDF fallback
   
   ## Title
   
   `[Flink Runner] No translator known for PrimitiveUnboundedRead after 
SDF-to-primitive-read conversion`
   
   ## Labels
   
   - `flink-runner`
   - `bug`
   - `P2`
   
   ## Body
   
   ### What happened?
   
   When using the Flink classic runner (non-portable) with unbounded source 
connectors like `KinesisIO.read()`, the pipeline fails with:
   
   ```
   java.lang.IllegalStateException: No translator known for 
org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead
   ```
   
   ### Environment
   
   - **Apache Beam version**: 2.60.0+ (tested with 2.69.0)
   - **Flink version**: 1.18, 1.19, 1.20
   - **Runner**: Flink classic runner (non-portable, without `beam_fn_api` 
experiment)
   - **Platform**: AWS Managed Apache Flink
   
   ### Steps to reproduce
   
   1. Create a Beam pipeline using `KinesisIO.read()` (or any 
`UnboundedSource`-based IO)
   2. Run with FlinkRunner on AWS Managed Flink (or any Flink environment 
without portable runner)
   3. Pipeline fails during translation
   
   ```java
   Pipeline p = Pipeline.create(options);
   p.apply(KinesisIO.read()
       .withStreamName("my-stream")
       .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON))
     .apply(ParDo.of(new ProcessFn()));
   p.run();
   ```
   
   ### Root Cause Analysis
   
   The Flink classic runner calls 
`SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()` 
in `FlinkRunner.run()` when NOT using the `beam_fn_api` experiment:
   
   ```java
   // FlinkRunner.java line 76-79
   // TODO(https://github.com/apache/beam/issues/20530): Use SDF read as 
default when we address performance issue.
   if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
     
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
   }
   ```
   
   This conversion transforms:
   - `Read.Unbounded` → `SplittableParDo.PrimitiveUnboundedRead`
   - `Read.Bounded` → `SplittableParDo.PrimitiveBoundedRead`
   
   However, `FlinkStreamingTransformTranslators` had no registered translators 
for these `PrimitiveUnboundedRead` and `PrimitiveBoundedRead` transforms, 
causing the "No translator known" error.
   
   ### Related Issues
   
   - Related to #20530 (Use SDF read as default when performance issue is 
addressed)
   
   ### Expected Behavior
   
   The pipeline should successfully translate and execute on the Flink classic 
runner.
   
   ### Actual Behavior
   
   Pipeline fails during translation with `IllegalStateException: No translator 
known for PrimitiveUnboundedRead`.
   
   ### Proposed Fix
   
   Add explicit translators for `PrimitiveUnboundedRead` and 
`PrimitiveBoundedRead` in `FlinkStreamingTransformTranslators.java` that 
delegate to the existing `FlinkUnboundedSource` and `FlinkBoundedSource` 
implementations.
   
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [x] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to