[jira] [Comment Edited] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17019670#comment-17019670 ] Berkay Öztürk edited comment on BEAM-9046 at 1/20/20 6:18 PM: -- [~mxm] I think I'm getting there. I have built the Docker containers and downgraded to apache_beam 2.13.0: {code:java} Traceback (most recent call last): File "test.py", line 27, in p.run() File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 419, in run return self.runner.run_pipeline(self, self._options) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/runners/portability/portable_runner.py", line 186, in run_pipeline portable_options)) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 650, in to_runner_api root_transform_id = context.transforms.get_id(self._root_transform()) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/runners/pipeline_context.py", line 84, in get_id self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 871, in to_runner_api for part in self.parts], File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 871, in for part in self.parts], File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/runners/pipeline_context.py", line 84, in get_id self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 871, in to_runner_api for part in self.parts], File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 871, in for part in self.parts], File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/runners/pipeline_context.py", line 84, in get_id self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/pipeline.py", line 856, in to_runner_api return self.transform.to_runner_api_transform(context, self.full_label) File "/home/USER/beam213/lib/python3.7/site-packages/apache _beam/transforms/external.py", line 180, in to_runner_api_transform id, context.coders._id_to_proto[id], proto)) RuntimeError: Re-used coder id: ref_Coder_TupleCoder_1 spec { spec { urn: "beam:coder:kv:v1" } environment_id: "ref_Environment_default_environment_1" } component_coder_ids: "ref_Coder_BytesCoder_2" component_coder_ids: "ref_Coder_BytesCoder_2"spec { spec { urn: "beam:coder:kv:v1" } } component_coder_ids: "ref_Coder_StrUtf8Coder_2" component_coder_ids: "ref_Coder_StrUtf8Coder_2"{code} Code (Taken from your Beam Summit NA 2019 presentation): {code:java} pipeline_options = PipelineOptions(['--job_name=Cross-Language-Demo', '--runner=PortableRunner', '--job_endpoint=localhost:8099', '--parallelism=1']) p = beam.Pipeline(options=pipeline_options) def to_upper(kv): k, v = kv return str(k).upper(), str(v).upper() (p | ReadFromKafka(consumer_config={'bootstrap.servers': 'localhost:9092', 'auto.offset.reset': 'latest'}, topics=['test']) | beam.Map(lambda kv: to_upper(kv)).with_output_types(KV[str, str]) | WriteToKafka(producer_config={'bootstrap.servers': 'localhost:9092'}, topic='test-out') ) p.run() {code} was (Author: berkayozturk): [~mxm] I think I'm getting there
[jira] [Comment Edited] (BEAM-9046) Kafka connector for Python throws ClassCastException when reading KafkaRecord
[ https://issues.apache.org/jira/browse/BEAM-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017933#comment-17017933 ] Berkay Öztürk edited comment on BEAM-9046 at 1/18/20 7:28 PM: -- [~mxm] This is what I have done: * {code:java} git clone https://github.com/mxm/beam.git beam-mxm{code} * {code:java} cd beam-mxm{code} * {code:java} git reset --hard b31cf99c75{code} * {code:java} ./gradlew build -p runners/flink/1.8/job-server{code} * {code:java} java -jar runners/flink/1.8/job-server/build/libs/beam-runners-flink-1.8-job-server-2.13.0-SNAPSHOT.jar {code} * Run Python pipeline with PortableRunner. Raises the error below: {code:java} Traceback (most recent call last): File "main.py", line 49, in run() File "main.py", line 32, in run topics=['test'] File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/transforms/ptransform.py", line 905, in __ror__ return self.transform.__ror__(pvalueish, self.label) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/transforms/ptransform.py", line 514, in __ror__ result = p.apply(self, pvalueish, label) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pipeline.py", line 481, in apply return self.apply(transform, pvalueish) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pipeline.py", line 517, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/runner.py", line 175, in apply return m(transform, input, options) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/runner.py", line 181, in apply_PTransform return transform.expand(input) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/io/external/kafka.py", line 119, in expand self.expansion_service)) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pvalue.py", line 110, in apply return self.pipeline.apply(*arglist, **kwargs) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pipeline.py", line 517, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/runner.py", line 175, in apply return m(transform, input, options) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/runner.py", line 181, in apply_PTransform return transform.expand(input) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/transforms/external.py", line 142, in expand for tag, pcoll_id in self._expanded_transform.outputs.items() File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/transforms/external.py", line 142, in for tag, pcoll_id in self._expanded_transform.outputs.items() File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/runners/pipeline_context.py", line 94, in get_by_id self._id_to_proto[id], self._pipeline_context) File "/home/USER/beam/lib/python3.5/site-packages/apache_be am/pvalue.py", line 178, in from_runner_api eleme