On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath <chamik...@google.com>
wrote:

>
>
> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <1705ay...@gmail.com> wrote:
>
>> Thank you guys for the reply. I am really stuck and could not proceed
>> further.
>> Yes, the previous trial published message had null key.
>> But when I send key:value pair through producer using
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>> > tryKey:tryValue
>>
>> I do not get any error but beam does not print the received message. Here
>> is how my pipeline looks like,
>> result = (
>>         pipeline
>>
>>         | "Read from kafka" >> ReadFromKafka(
>>             consumer_config={
>>                 "bootstrap.servers": 'localhost:9092',
>>             },
>>             topics=['mytopic'],
>>             expansion_service='localhost:8097',
>>
>>         | "print" >> beam.Map(print)
>>         )
>>
>>
> I suspect DirectRunner in LOOPBACK mode might not be working for
> cross-language transforms today.
>

When running a Streaming pipeline, the DirectRuner falls back to the old
runner that does not support cross-language.
https://issues.apache.org/jira/browse/BEAM-7514

Please note that cross-language transforms framework is fairly new [1] and
> we are adding support for various runners and environment configurations.
> Can you try with Flink in DOCKER mode ?
>
>
>> If this is not the way we make beam and kafka communicate then please
>> share a working example which showcases how a message published in kafka
>> gets received by beam while streaming.
>>
>
> I'm adding an example but I've only tested this with Dataflow yet. I hope
> to test that example for more runners and add additional instructions
> there.
> https://github.com/apache/beam/pull/12188
>
> Thanks,
> Cham
>
> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>
>>
>> Regards,
>> Ayush Sharma
>>
>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>> be updated to support this. You should not run into this error if you
>>> publish keys and values that are not null.
>>>
>>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> +dev <d...@beam.apache.org>
>>>>
>>>> On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> +Heejong Lee <heej...@google.com> +Chamikara Jayalath
>>>>> <chamik...@google.com>
>>>>>
>>>>> Do you know if your trial record has an empty key or value?
>>>>> If so, then you hit a bug and it seems as though there was a miss
>>>>> supporting this usecase.
>>>>>
>>>>> Heejong and Cham,
>>>>> It looks like the Javadoc for ByteArrayDeserializer and other
>>>>> Deserializers can return null[1, 2] and we aren't using
>>>>> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
>>>>> non XLang KafkaIO does this correctly in its regular coder inference
>>>>> logic[4]. I flied BEAM-10529[5]
>>>>>
>>>>> 1:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 2:
>>>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
>>>>> 3:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
>>>>> 4:
>>>>> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
>>>>> 5: https://issues.apache.org/jira/browse/BEAM-10529
>>>>>
>>>>>
>>>>> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to build a streaming beam pipeline in python which should
>>>>>> capture messages from kafka and then execute further stages of data
>>>>>> fetching from other sources and aggregation. The step-by-step process of
>>>>>> what I have built till now is:
>>>>>>
>>>>>>    1.
>>>>>>
>>>>>>    Running Kafka instance on localhost:9092
>>>>>>
>>>>>>    ./bin/kafka-server-start.sh ./config/server.properties
>>>>>>    2.
>>>>>>
>>>>>>    Run beam-flink job server using docker
>>>>>>
>>>>>>    docker run --net=host apache/beam_flink1.10_job_server:latest
>>>>>>    3.
>>>>>>
>>>>>>    Run beam-kafka pipeline
>>>>>>
>>>>>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>>>>>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options 
>>>>>> import PipelineOptions, StandardOptions
>>>>>>
>>>>>> if __name__ == '__main__':
>>>>>>     options = PipelineOptions([
>>>>>>         "--job_endpoint=localhost:8099",
>>>>>>         "--environment_type=LOOPBACK",
>>>>>>         "--streaming",
>>>>>>         "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>>>>>>     ])
>>>>>>
>>>>>>     options = options.view_as(StandardOptions)
>>>>>>     options.streaming = True
>>>>>>
>>>>>>     pipeline = beam.Pipeline(options=options)
>>>>>>
>>>>>>     result = (
>>>>>>         pipeline
>>>>>>
>>>>>>         | "Read from kafka" >> ReadFromKafka(
>>>>>>             consumer_config={
>>>>>>                 "bootstrap.servers": 'localhost:9092',
>>>>>>             },
>>>>>>             topics=['mytopic'],
>>>>>>             expansion_service='localhost:8097',
>>>>>>         )
>>>>>>
>>>>>>         | beam.Map(print)
>>>>>>     )
>>>>>>
>>>>>>     pipeline.run()
>>>>>>
>>>>>>
>>>>>>    1. Publish new message using kafka-producer.sh
>>>>>>
>>>>>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
>>>>>> mytopic>tryme
>>>>>>
>>>>>> After publishing this trial message, the beam pipeline perceives the
>>>>>> message but crashes giving this error:
>>>>>>
>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>>>>>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>>>>>>     at 
>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>>     at 
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>>>>>>     at 
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>>>>>>     at 
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>>>>>>     at 
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>>>>>>     at 
>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>>>>>>     at 
>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>>>  Source)
>>>>>>     at 
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>>>>>>     at 
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>>>>>>     at 
>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>>>>>>     at org.apache.beam
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Ayush Sharma.
>>>>>>
>>>>>>

Reply via email to