methodmissing opened a new pull request #12794:
URL: https://github.com/apache/beam/pull/12794
TLDR Let KafkaIO support the deserializer API with headers
References mailing list posts:
* Original
https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E
* Reply from Luke
https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E
### Design decisions
The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency,
compilation fails with:
```
required: String,byte[]
found: String,Headers,byte[]
reason: actual and formal argument lists differ in length
where T is a type-variable:
```
Because the headers default API only landed in 2.1.0 via
https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced
I opted for `ConsumerSpEL#deserializeKey` and
`ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency
for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already
depended on an instance thereof.
### Not so great things
Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the
deserialization path into a more expensive indirection by calling the
deserializer methods using reflection (2x per record, 1 x key, 1 x value):
```
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
<<<<<<<<<<<
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
<<<<<<<<<<<
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
<<<<<<<<<<<
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
<<<<<<<<<<<
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
<<<<<<<<<<<
at
org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
at
org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
at
org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
at
org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
at
org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
at
org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
at
org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
at
org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
at
org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
```
And effectively this penalizes the more recent Kafka API versions in favor
of the older ones. I have not measured the overhead thereof, yet.
### Other avenues explored
For runtime deserialization:
* Naively tried conditional compile options but the compiler cannot know
which kafka-clients version could be used at runtime
For regression tests (that we don't stop passing headers in the future):
* I tried Mockito and Powermock implementations on both
`LocalDeserializerProvider` and the Integer and Long serializers in tests, but
found the stack to be too deep and backed out of that.
* Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be
called twice as much for the newer API), but again deep stack and hard to
assert. Just the call is interesting because the constructor used for
`ConsumerRecord` in the tests does not use the one that sets headers,
presumably for client API compatibility too.
* Evaluated `ExtendedSerializer`´s
[wrapper](https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html),
but `ExtendedSerializer` is deprecated API and no point in bringing that in as
a dependency
### How the current regression test works
I figured it makes sense given this feature tests deserialization and the
whole test suite depends on the `Integer` (for keys) and Long (for values) ones
to implement a key and value deserializer that can assert the behaviour. And
herein also lies somewhat of a problem because the test case is a bit weak as I
relied on stack frames (wide array of suppored client versions makes anything
else super complex) to infer the caller of the `deserialize` method, but
unfortunately only class and method name context is provide and no arguments
size of 3 or even types on those to assert on.
Kafka client API 1.0.0 :
```
Frame 0: java.lang.Thread.getStackTrace
Frame 1:
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2:
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
```
For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning
it was called directly and not via a default or actual implementation on
`Deserializer`. Frames 1 and 2 being equal is because of the
`super.deserialize` call.
Kafka client API 2.1.0+ :
```
Frame 0: java.lang.Thread.getStackTrace
Frame 1:
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2:
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
```
For clients 2.1.0 and beyond, frame 3 is
`org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true
for the bundled deserializers used in the tests because they delegate the call
to the implementation on `Deserializer`. In practice this may refer to an
actual override implementation.
### Feedback items for me
* Any alternatives for the SpEL evaluation for this hot path API?
`consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not
called as often as twice per record.
* Ideas for a better way to test for regressions?
### Questions
* Would it make sense to consider raising the minimum supported client API
in order to
* If this implementation (and very likely iterations thereof :-)), would
support for the same API on serialization be appreciated as well?
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
--- | --- | --- | --- | --- | --- | ---
Go | [](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
| ---
Java | [](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
Python | [](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
| ---
XLang | [](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
| --- | [](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
| ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website | Whitespace | Typescript
--- | --- | --- | --- | --- | --- | ---
Non-portable | [](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/)
<br>[](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/)
| [](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
Portable | --- | [](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | --- | --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]