[ https://issues.apache.org/jira/browse/BEAM-11862?focusedWorklogId=572742&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-572742 ]
ASF GitHub Bot logged work on BEAM-11862: ----------------------------------------- Author: ASF GitHub Bot Created on: 26/Mar/21 16:46 Start Date: 26/Mar/21 16:46 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #14306: URL: https://github.com/apache/beam/pull/14306#issuecomment-808365266 Cutting the branch leaves SNAPSHOT version but does update Dataflow v1 container (though I could go either way on this): https://github.com/apache/beam/blob/85e93ca3a01990e8da1e10af7826662012c06d74/runners/google-cloud-dataflow-java/build.gradle#L48 I don't know much about where other containers are specified. I am very open to adjusting the scripts more if it improves stabilization of releases and still works well with the idea of long-lived release branches. I'll take another look on Monday. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 572742) Time Spent: 4h 10m (was: 4h) > Write To Kafka does not work > ---------------------------- > > Key: BEAM-11862 > URL: https://issues.apache.org/jira/browse/BEAM-11862 > Project: Beam > Issue Type: Bug > Components: cross-language, io-py-kafka > Affects Versions: 2.28.0 > Reporter: Dénes Bartha > Assignee: Chamikara Madhusanka Jayalath > Priority: P1 > Fix For: 2.29.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > I am trying to send data to a Kafka topic in Python using {{WriteToKafka}} > via Apache Beam using Dataflow as a runner. > By running the following script: > {code:java} > with beam.Pipeline(options=beam_options) as p: > (p > | beam.Impulse() > | beam.Map(lambda input: (1, input)) > | WriteToKafka( > producer_config={ > 'bootstrap.servers': 'ip:9092,', > }, > topic='testclient', > > key_serializer='org.apache.kafka.common.serialization.LongSerializer', > > value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer', > ) > ) > {code} > I am getting this error: > > {code:java} > Traceback (most recent call last): > File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module> > run_pipeline(beam_options) > File "/home/denes/data-science/try_write_to_kafka.py", line 38, in > run_pipeline > (p > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 582, in _exit_ > self.result = self.run() > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 529, in run > return Pipeline.from_runner_api( > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 904, in from_runner_api > p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", > line 115, in get_by_id > self._id_to_obj[id] = self._obj_type.from_runner_api( > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 1259, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", > line 115, in get_by_id > self._id_to_obj[id] = self._obj_type.from_runner_api( > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 1259, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", > line 115, in get_by_id > self._id_to_obj[id] = self._obj_type.from_runner_api( > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 1259, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", > line 115, in get_by_id > self._id_to_obj[id] = self._obj_type.from_runner_api( > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 1259, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", > line 115, in get_by_id > self._id_to_obj[id] = self._obj_type.from_runner_api( > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 1236, in from_runner_api > transform = ptransform.PTransform.from_runner_api(proto, context) > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", > line 700, in from_runner_api > return constructor( > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", > line 1419, in from_runner_api_parameter > DoFnInfo.from_runner_api( > File > "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", > line 1493, in from_runner_api > raise ValueError('Unexpected DoFn type: %s' % spec.urn) > ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1 > {code} > > If I am not wrong, the problem is with the serialization methods. I have > tried all sorts of combinations that I have found on > [this|https://kafka.apache.org/26/javadoc/org/apache/kafka/common/serialization/] > page. > When I do not specify the serializers then I get this error: {{RuntimeError:}} > {code:java} > Traceback (most recent call last):Traceback (most recent call last): File > "/home/denes/data-science/try_write_to_kafka.py", line 48, in <module> > run_pipeline(beam_options) File > "/home/denes/data-science/try_write_to_kafka.py", line 14, in run_pipeline > WriteToKafka( File > "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pvalue.py", > line 141, in __or__ return self.pipeline.apply(ptransform, self) File > "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", > line 689, in apply pvalueish_result = self.runner.apply(transform, > pvalueish, self._options) File > "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py", > line 188, in apply return m(transform, input, options) File > "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py", > line 218, in apply_PTransform return transform.expand(input) File > "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/transforms/external.py", > line 318, in expand raise RuntimeError(response.error)RuntimeError: > java.lang.ClassCastException: class org.apache.beam.sdk.coders.VarLongCoder > cannot be cast to class org.apache.beam.sdk.coders.KvCoder > (org.apache.beam.sdk.coders.VarLongCoder and > org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app') at > org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2295) at > org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2088) at > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498) at > org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:360) > at > org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:436) > at > org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491) > at > org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > {code} > {{Note that I have installed the latest apache-beam version via `pip install > 'apache-beam'`}}. > apache-beam==2.28.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)