dnamaz opened a new pull request, #37036:
URL: https://github.com/apache/beam/pull/37036
# Pull Request: Add PrimitiveUnboundedRead/PrimitiveBoundedRead Translators
to Flink Runner
## Title
`[Flink Runner] Add translators for PrimitiveUnboundedRead and
PrimitiveBoundedRead`
## Description
### What is this PR doing?
This PR adds explicit translators for
`SplittableParDo.PrimitiveUnboundedRead` and
`SplittableParDo.PrimitiveBoundedRead` to the Flink streaming transform
translators. These translators handle the case where `Read.Unbounded` and
`Read.Bounded` are converted to primitive reads by
`SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()`.
### Why is this needed?
The Flink classic runner (non-portable) calls
`convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()` when NOT using
the `beam_fn_api` experiment. This converts SDF-wrapped reads to
`PrimitiveUnboundedRead` and `PrimitiveBoundedRead` transforms. However, there
were no registered translators for these transforms, causing pipelines using
unbounded sources (like `KinesisIO.read()`) to fail with:
```
java.lang.IllegalStateException: No translator known for
org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead
```
### How does it work?
The new translators:
1. **`PrimitiveUnboundedReadTranslator`**: Extracts the `UnboundedSource`
from `PrimitiveUnboundedRead.getSource()` and creates a `FlinkUnboundedSource`,
following the same pattern as the existing `UnboundedReadSourceTranslator`.
2. **`PrimitiveBoundedReadTranslator`**: Extracts the `BoundedSource` from
`PrimitiveBoundedRead.getSource()` and creates a `FlinkBoundedSource`,
following the same pattern as the existing `BoundedReadSourceTranslator`.
The key difference from the existing translators is that they retrieve the
source directly from the transform (`transform.getSource()`) rather than using
`ReadTranslation.unboundedSourceFromTransform()`, since
`PrimitiveUnboundedRead` and `PrimitiveBoundedRead` are not standard Read
transforms with URNs.
### Changes
- **`FlinkStreamingTransformTranslators.java`**:
- Added `PrimitiveUnboundedReadTranslator<T>` class
- Added `PrimitiveBoundedReadTranslator<T>` class
- Modified `getTranslator()` to check for `PrimitiveUnboundedRead` and
`PrimitiveBoundedRead` instances before URN lookup
- **`FlinkStreamingTransformTranslatorsTest.java`**:
- Added `getTranslatorReturnsPrimitiveUnboundedReadTranslator()` test
- Added `getTranslatorReturnsPrimitiveBoundedReadTranslator()` test
- Added `primitiveUnboundedReadTranslatorProducesCorrectSource()` test
- Added `primitiveBoundedReadTranslatorProducesCorrectSource()` test
- **`CHANGES.md`**:
- Added bugfix entry for 2.71.0
## Issue
Fixes #XXXXX
Created issue: #37035
Related to #20530 (Use SDF read as default)
## Checklist
- [x] Code formatted with `./gradlew :runners:flink:1.18:spotlessApply`
- [x] Unit tests added in `FlinkStreamingTransformTranslatorsTest.java`
- [x] All Flink runner tests pass (`./gradlew :runners:flink:1.18:test`)
- [x] `CHANGES.md` updated and formatted with `./gradlew formatChanges`
- [x] No breaking changes to public API
## Testing
### Unit Tests
```bash
./gradlew :runners:flink:1.18:test
# BUILD SUCCESSFUL - all tests pass
```
### Integration Testing
Tested on AWS Managed Apache Flink with a real pipeline using
`KinesisIO.read()`:
1. **Before fix**: Pipeline fails during translation with "No translator
known for PrimitiveUnboundedRead"
2. **After fix**: Pipeline successfully translates and runs, reading records
from Kinesis
Test environment:
- AWS Managed Apache Flink (FLINK-1_18 runtime)
- Apache Beam 2.71.0-SNAPSHOT (with this fix)
- KinesisIO.read() connector
## Backwards Compatibility
This change is **fully backwards compatible**:
1. **No public API changes**: Only internal translator classes are added
2. **No behavior changes for existing code**: The new translators only
activate when `PrimitiveUnboundedRead` or `PrimitiveBoundedRead` transforms are
present (which previously caused failures)
3. **Existing URN-based translation unchanged**: Standard `Read.Bounded` and
`Read.Unbounded` with URNs continue to use the existing `ReadSourceTranslator`
## Performance
No performance impact expected. The new translators use the same
`FlinkUnboundedSource` and `FlinkBoundedSource` implementations as the existing
translators.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]