[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17019752#comment-17019752 ]
Maximilian Michels commented on BEAM-9046: ------------------------------------------ Are you running this against a Beam 2.13.0 Expansion / Job server? At this point your setup should be identical to mine :) > Kafka connector for Python throws ClassCastException when reading KafkaRecord > ----------------------------------------------------------------------------- > > Key: BEAM-9046 > URL: https://issues.apache.org/jira/browse/BEAM-9046 > Project: Beam > Issue Type: Bug > Components: io-py-kafka > Affects Versions: 2.16.0 > Reporter: Berkay Öztürk > Priority: Major > Labels: KafkaIO, Python > Fix For: Not applicable > > > I'm trying to read the data streaming from Apache Kafka using the Python SDK > for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink > 1.8.3, I follow these steps: > * Compile and run Beam 2.16 with Flink 1.8 runner. > {code:java} > git clone --single-branch --branch release-2.16.0 > https://github.com/apache/beam.git beam-2.16.0 > cd beam-2.16.0 > nohup ./gradlew :runners:flink:1.8:job-server:runShadow > -PflinkMasterUrl=localhost:8081 & > {code} > * Run the Python pipeline. > {code:python} > from apache_beam import Pipeline > from apache_beam.io.external.kafka import ReadFromKafka > from apache_beam.options.pipeline_options import PipelineOptions > if __name__ == '__main__': > with Pipeline(options=PipelineOptions([ > '--runner=FlinkRunner', > '--flink_version=1.8', > '--flink_master_url=localhost:8081', > '--environment_type=LOOPBACK', > '--streaming' > ])) as pipeline: > ( > pipeline > | 'read' >> ReadFromKafka({'bootstrap.servers': > 'localhost:9092'}, ['test']) # [BEAM-3788] ??? > ) > result = pipeline.run() > result.wait_until_finish() > {code} > * Publish some data to Kafka. > {code:java} > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > >{"hello":"world!"} > {code} > The Python script throws this error: > {code:java} > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation BeamApp-USER-somejob. > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: xxx) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 13 more > Caused by: java.lang.ClassCastException: > org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B > at > org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) > at > org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56) > at > org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105) > at > org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) > at > org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51) > at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:67) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:341) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:283) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) > at > org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > ... 1 more > ERROR:root:java.lang.ClassCastException: > org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B > [flink-runner-job-invoker] INFO > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService > - Manifest at/tmp/artifacts0k1mnin0/somejob/MANIFEST has 0 artifact locations > [flink-runner-job-invoker] INFO > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService > - Removed dir /tmp/artifacts0k1mnin0/job_somejob/ > Traceback (most recent call last): > File "main.py", line 40, in <module> > run() > File "main.py", line 37, in run > result.wait_until_finish() > File > "/home/USER/beam/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", > line 439, in wait_until_finish self._job_id, self._state, > self._last_error_message())) > RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: > java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot > be cast to [B > {code} > I tried other deserializers available in Kafka but they did not work, for > example: > {code:java} > Couldn't infer Coder from class > org.apache.kafka.common.serialization.StringDeserializer{code} > When I pass any coder from > {code:java} > org.apache.beam.sdk.coders{code} > I get this error: > {code:java} > java.lang.RuntimeException: Failed to build transform > beam:external:java:kafka:read:v1 from spec urn: > "beam:external:java:kafka:read:v1" ... Caused by: java.lang.RuntimeException: > Couldn't resolve coder for Deserializer ... > {code} > I also tried applying [this > patch|https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed] > by modifying the source code, but it didn't work: > {code:java} > RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: > java.lang.ClassNotFoundException: > org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayDeserializer > {code} > As another solution, I tried cloning that repository from the given commit: > {code:java} > git clone https://github.com/mxm/beam.git beam-mxm > git reset --hard b31cf99c75 > {code} > But it also did not work: > {code:java} > Project 'runners' not found in root project 'beam' > {code} > If this is not a bug but a problem on my part, please answer [my > StackOverflow question|https://stackoverflow.com/q/59501461/12569644] and > close this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)