dnamaz opened a new pull request, #37036:
URL: https://github.com/apache/beam/pull/37036

   # Pull Request: Add PrimitiveUnboundedRead/PrimitiveBoundedRead Translators 
to Flink Runner
   
   ## Title
   
   `[Flink Runner] Add translators for PrimitiveUnboundedRead and 
PrimitiveBoundedRead`
   
   ## Description
   
   ### What is this PR doing?
   
   This PR adds explicit translators for 
`SplittableParDo.PrimitiveUnboundedRead` and 
`SplittableParDo.PrimitiveBoundedRead` to the Flink streaming transform 
translators. These translators handle the case where `Read.Unbounded` and 
`Read.Bounded` are converted to primitive reads by 
`SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()`.
   
   ### Why is this needed?
   
   The Flink classic runner (non-portable) calls 
`convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()` when NOT using 
the `beam_fn_api` experiment. This converts SDF-wrapped reads to 
`PrimitiveUnboundedRead` and `PrimitiveBoundedRead` transforms. However, there 
were no registered translators for these transforms, causing pipelines using 
unbounded sources (like `KinesisIO.read()`) to fail with:
   
   ```
   java.lang.IllegalStateException: No translator known for 
org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead
   ```
   
   ### How does it work?
   
   The new translators:
   
   1. **`PrimitiveUnboundedReadTranslator`**: Extracts the `UnboundedSource` 
from `PrimitiveUnboundedRead.getSource()` and creates a `FlinkUnboundedSource`, 
following the same pattern as the existing `UnboundedReadSourceTranslator`.
   
   2. **`PrimitiveBoundedReadTranslator`**: Extracts the `BoundedSource` from 
`PrimitiveBoundedRead.getSource()` and creates a `FlinkBoundedSource`, 
following the same pattern as the existing `BoundedReadSourceTranslator`.
   
   The key difference from the existing translators is that they retrieve the 
source directly from the transform (`transform.getSource()`) rather than using 
`ReadTranslation.unboundedSourceFromTransform()`, since 
`PrimitiveUnboundedRead` and `PrimitiveBoundedRead` are not standard Read 
transforms with URNs.
   
   ### Changes
   
   - **`FlinkStreamingTransformTranslators.java`**:
     - Added `PrimitiveUnboundedReadTranslator<T>` class
     - Added `PrimitiveBoundedReadTranslator<T>` class  
     - Modified `getTranslator()` to check for `PrimitiveUnboundedRead` and 
`PrimitiveBoundedRead` instances before URN lookup
   
   - **`FlinkStreamingTransformTranslatorsTest.java`**:
     - Added `getTranslatorReturnsPrimitiveUnboundedReadTranslator()` test
     - Added `getTranslatorReturnsPrimitiveBoundedReadTranslator()` test
     - Added `primitiveUnboundedReadTranslatorProducesCorrectSource()` test
     - Added `primitiveBoundedReadTranslatorProducesCorrectSource()` test
   
   - **`CHANGES.md`**:
     - Added bugfix entry for 2.71.0
   
   ## Issue
   
   Fixes #XXXXX
   
   Created issue: #37035
   Related to #20530 (Use SDF read as default)
   
   ## Checklist
   
   - [x] Code formatted with `./gradlew :runners:flink:1.18:spotlessApply`
   - [x] Unit tests added in `FlinkStreamingTransformTranslatorsTest.java`
   - [x] All Flink runner tests pass (`./gradlew :runners:flink:1.18:test`)
   - [x] `CHANGES.md` updated and formatted with `./gradlew formatChanges`
   - [x] No breaking changes to public API
   
   ## Testing
   
   ### Unit Tests
   
   ```bash
   ./gradlew :runners:flink:1.18:test
   # BUILD SUCCESSFUL - all tests pass
   ```
   
   ### Integration Testing
   
   Tested on AWS Managed Apache Flink with a real pipeline using 
`KinesisIO.read()`:
   
   1. **Before fix**: Pipeline fails during translation with "No translator 
known for PrimitiveUnboundedRead"
   2. **After fix**: Pipeline successfully translates and runs, reading records 
from Kinesis
   
   Test environment:
   - AWS Managed Apache Flink (FLINK-1_18 runtime)
   - Apache Beam 2.71.0-SNAPSHOT (with this fix)
   - KinesisIO.read() connector
   
   ## Backwards Compatibility
   
   This change is **fully backwards compatible**:
   
   1. **No public API changes**: Only internal translator classes are added
   2. **No behavior changes for existing code**: The new translators only 
activate when `PrimitiveUnboundedRead` or `PrimitiveBoundedRead` transforms are 
present (which previously caused failures)
   3. **Existing URN-based translation unchanged**: Standard `Read.Bounded` and 
`Read.Unbounded` with URNs continue to use the existing `ReadSourceTranslator`
   
   ## Performance
   
   No performance impact expected. The new translators use the same 
`FlinkUnboundedSource` and `FlinkBoundedSource` implementations as the existing 
translators.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to