Sorry, not sure what you mean? From: Jaehyeon Kim <[email protected]> Sent: Thursday, October 3, 2024 4:10 PM To: [email protected] Subject: Re: How to write to a Kafka topic?
You don't often get email from [email protected]<mailto:[email protected]>. Learn why this is important<https://aka.ms/LearnAboutSenderIdentification> Nice! Seems to be working. I will post a HowTo fo the next Python developer. | "temp convert" >> beam.Map(temp_convert).with_output_types(typing.Tuple[bytes, bytes]) On Fri, 4 Oct 2024 at 09:02, Henry Tremblay via user <[email protected]<mailto:[email protected]>> wrote: So I changed the temp_convert to return a tuple of bytes, and now am getting: raise RuntimeError(result.error) RuntimeError: org.apache.beam.sdk.coders.CoderException: `UnknownCoderWrapper` was used to perform an actual decoding in the Java SDK. Potentially a Java transform is being followed by a cross-language transform thatuses a coder that is not available in the Java SDK. Please make sure that Python transforms at the multi-language boundary use Beam portable coders. I am using the DirectRunner Thanks! From: Jaehyeon Kim <[email protected]<mailto:[email protected]>> Sent: Thursday, October 3, 2024 3:57 PM To: [email protected]<mailto:[email protected]> Subject: Re: How to write to a Kafka topic? You don't often get email from [email protected]<mailto:[email protected]>. Learn why this is important<https://aka.ms/LearnAboutSenderIdentification> Hello, It doesn't seem that the following transform outputs a tuple of bytes. | "temp convert" >> beam.Map(temp_convert) You should convert the output into something like ('key'.encode('utf-8'), 'value'.encode('utf-8')). I wrote a simple post about Kafka I/O read/write on the Flink Runner, and hope it helps - https://jaehyeon.me/blog/2024-04-18-beam-local-dev-3/ Cheers, Jaehyeon On Fri, 4 Oct 2024 at 08:21, Henry Tremblay via user <[email protected]<mailto:[email protected]>> wrote: I am creating a simple pipeline to read and then write to Kafka. Here is my code: 156 with Pipeline(options=pipeline_options) as pipeline: 157 main = ( 158 pipeline 159 | ReadFromKafka( 160 consumer_config={'bootstrap.servers': configs['bootstrap_servers'], 161 'group.id<http://group.id>': 'my-group', 162 'isolation.level': 'read_uncommitted', 163 }, 164 topics= configs['topics'], 165 max_num_records = max_num_records, 166 commit_offset_in_finalize = True, 167 with_metadata=True) 168 | "temp convert" >> beam.Map(temp_convert) 169 | "Write to Kafka" >> WriteToKafka( 170 producer_config={'bootstrap.servers': configs['bootstrap_servers']}, 171 topic=configs['out_topic'], 172 ) 173 ) This is giving me RuntimeError: java.lang.ClassCastException: class org.apache.beam.sdk.util.construction.UnknownCoderWrapper cannot be cast to class org.apache.beam.sdk.coders.KvCoder (org.apache.beam.sdk.util.construction.UnknownCoderWrapper and org.apache.beam.sdk.coders.KvCoder are in unnamed module of loader 'app')’ Can someone point me to a simple snipet for writing to Kafka in Python? I have looked in vain on the web. Thanks!
