OK, I replaced:
            .apply(KafkaIO.<String, String>read()
                .withBootstrapServers(servers)
                .withTopic(readTopic)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata()

with:
            .apply(KafkaIO.<String, String>read()
                .withBootstrapServers(servers)
                .withTopic(readTopic)
                .withKeyDeserializerAndCoder(StringDeserializer.class,
StringUtf8Coder.of())
                .withValueDeserializerAndCoder(StringDeserializer.class,
StringUtf8Coder.of())
                .withoutMetadata()

and now it seems to "work", I mean, not crash.

I'm testing with kafka-console-consumer and kafka-sonsole-producer but I
cannot see any output on the writeTopic.

The same behavior occurs with this Python example:

    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
            pipeline |
            ReadFromKafka(
                consumer_config={'bootstrap.servers': servers},
                topics=read_topics,
            ).with_output_types(Tuple[bytes, bytes]) |
            Map(lambda x: x[1].decode('utf-8')) |
            'Split' >> FlatMap(lambda x: re.findall(r'[A-Za-z\']+',
x)).with_output_types(str) |
            'PairWithOne' >> Map(lambda x: (x, 1)) |
            'GruoupAndSumWindow' >> WindowInto(FixedWindows(10)) |
            'GroupAndSum' >> CombinePerKey(sum) |
            'Format' >> Map(format_result) |
            WriteToKafka(
                producer_config={'bootstrap.servers': servers},
                topic=write_topic,
            ).with_input_types(Tuple[bytes, bytes])
        )

On Tue, Jul 20, 2021 at 3:23 PM Ignacio Taranto <
ignacio.tara...@eclypsium.com> wrote:

> Hello,
>
> I'm trying to setup an Apache Beam Java pipeline that:
>
>    - Reads messages from Kafka
>    - Calls an external Python transform
>    - Writes the output to Kafka
>
> Before this, I tried simple pipelines without Kafka: for example, using
> "Create" to generate some test values in Java and then pass those to a
> dummy Python transform. That worked so far.
>
> This is an extract of the pipeline's code:
>
>  public static void main(String[] args) {
>         // ...
>         pipeline
>             .apply(KafkaIO.<String, String>read()
>                 .withBootstrapServers(servers)
>                 .withTopic(readTopic)
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class)
>                 .withoutMetadata()
>             )
>             .apply(new 
> CrossLanguageTransform(options.getExpansionServiceURL()))
>             .apply(KafkaIO.<String, String>write()
>                 .withBootstrapServers(servers)
>                 .withTopic(writeTopic)
>                 .withKeySerializer(StringSerializer.class)
>                 .withValueSerializer(StringSerializer.class)
>             );
>
>         pipeline.run().waitUntilFinish();
>     }
>
> And this is the external transform wrapper:
>
> public class CrossLanguageTransform extends PTransform<PCollection<KV<String, 
> String>>, PCollection<KV<String, String>>> {
>     private static final String URN = "beam:transforms:xlang:pythonwordcount";
>
>     private static String expansionAddress;
>
>     public CrossLanguageTransform(String expansionAddress) {
>         this.expansionAddress = expansionAddress;
>     }
>
>     @Override
>     public PCollection<KV<String, String>> expand(PCollection<KV<String, 
> String>> pcoll) {
>         return pcoll.apply(
>             "PythonWordCount",
>             External.of(URN, new byte [] {}, expansionAddress)
>         );
>     }
> }
>
> After running with:
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.CrossLanguagePipeline 
> -Pportable-runner -Dexec.args="--runner=PortableRunner 
> --jobEndpoint=localhost:8099 --useExternal=true 
> --expansionServiceURL=localhost:9097 --experiments=beam_fn_api"
>
> I get:
> KeyError: 'beam:coders:javasdk:0.1'
>
> It's not even reaching my Python's transform code. I can reproduce this 
> without registering the Python's transform, for example:
> python -m apache_beam.runners.portability.expansion_service_test -p 9097
>
> What am I doing wrong here?
>
>
>
>

-- 


This e-mail and any attachments may contain information that is 
privileged, confidential,  and/or exempt from disclosure under applicable 
law.  If you are not the intended recipient, you are hereby notified that 
any disclosure, copying, distribution or use of any information contained 
herein is strictly prohibited. If you have received this transmission in 
error, please immediately notify the sender and destroy the original 
transmission and any attachments, whether in electronic or hard copy 
format, without reading or saving.












Reply via email to