[ 
https://issues.apache.org/jira/browse/BEAM-11862?focusedWorklogId=572742&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-572742
 ]

ASF GitHub Bot logged work on BEAM-11862:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Mar/21 16:46
            Start Date: 26/Mar/21 16:46
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on pull request #14306:
URL: https://github.com/apache/beam/pull/14306#issuecomment-808365266


   Cutting the branch leaves SNAPSHOT version but does update Dataflow v1 
container (though I could go either way on this): 
https://github.com/apache/beam/blob/85e93ca3a01990e8da1e10af7826662012c06d74/runners/google-cloud-dataflow-java/build.gradle#L48
   
   I don't know much about where other containers are specified. I am very open 
to adjusting the scripts more if it improves stabilization of releases and 
still works well with the idea of long-lived release branches.
   
   I'll take another look on Monday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 572742)
    Time Spent: 4h 10m  (was: 4h)

> Write To Kafka does not work
> ----------------------------
>
>                 Key: BEAM-11862
>                 URL: https://issues.apache.org/jira/browse/BEAM-11862
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, io-py-kafka
>    Affects Versions: 2.28.0
>            Reporter: Dénes Bartha
>            Assignee: Chamikara Madhusanka Jayalath
>            Priority: P1
>             Fix For: 2.29.0
>
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> I am trying to send data to a Kafka topic in Python using {{WriteToKafka}} 
> via Apache Beam using Dataflow as a runner.
> By running the following script:
> {code:java}
> with beam.Pipeline(options=beam_options) as p:
>         (p
>         | beam.Impulse()
>         | beam.Map(lambda input: (1, input))
>         | WriteToKafka(
>                     producer_config={
>                         'bootstrap.servers': 'ip:9092,',
>                     },
>                     topic='testclient',
>                     
> key_serializer='org.apache.kafka.common.serialization.LongSerializer',
>                     
> value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>                 )
>          )
> {code}
> I am getting this error:
>  
> {code:java}
> Traceback (most recent call last):
>  File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
>  run_pipeline(beam_options)
>  File "/home/denes/data-science/try_write_to_kafka.py", line 38, in 
> run_pipeline
>  (p
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 582, in _exit_
>  self.result = self.run()
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 529, in run
>  return Pipeline.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 904, in from_runner_api
>  p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1259, in from_runner_api
>  part = context.transforms.get_by_id(transform_id)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",
>  line 115, in get_by_id
>  self._id_to_obj[id] = self._obj_type.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 1236, in from_runner_api
>  transform = ptransform.PTransform.from_runner_api(proto, context)
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py",
>  line 700, in from_runner_api
>  return constructor(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
>  line 1419, in from_runner_api_parameter
>  DoFnInfo.from_runner_api(
>  File 
> "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py",
>  line 1493, in from_runner_api
>  raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>  ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
> {code}
>  
> If I am not wrong, the problem is with the serialization methods. I have 
> tried all sorts of combinations that I have found on 
> [this|https://kafka.apache.org/26/javadoc/org/apache/kafka/common/serialization/]
>  page.
> When I do not specify the serializers then I get this error: {{RuntimeError:}}
> {code:java}
> Traceback (most recent call last):Traceback (most recent call last):  File 
> "/home/denes/data-science/try_write_to_kafka.py", line 48, in <module>    
> run_pipeline(beam_options)  File 
> "/home/denes/data-science/try_write_to_kafka.py", line 14, in run_pipeline    
> WriteToKafka(  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pvalue.py",
>  line 141, in __or__    return self.pipeline.apply(ptransform, self)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 689, in apply    pvalueish_result = self.runner.apply(transform, 
> pvalueish, self._options)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
>  line 188, in apply    return m(transform, input, options)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/runners/runner.py",
>  line 218, in apply_PTransform    return transform.expand(input)  File 
> "/home/denes/beam_kafka/venv/lib/python3.8/site-packages/apache_beam/transforms/external.py",
>  line 318, in expand    raise RuntimeError(response.error)RuntimeError: 
> java.lang.ClassCastException: class org.apache.beam.sdk.coders.VarLongCoder 
> cannot be cast to class org.apache.beam.sdk.coders.KvCoder 
> (org.apache.beam.sdk.coders.VarLongCoder and 
> org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app') at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2295) at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:2088) at 
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at 
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498) at 
> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:360)
>  at 
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:436)
>  at 
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
>  at 
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>  at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> {{Note that I have installed the latest apache-beam version  via `pip install 
> 'apache-beam'`}}.
> apache-beam==2.28.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to