Hi,

I hit an issue when using x-lang python pipeline (with ReadFromKafka) with subsequent WindowInto with trigger=Repeatedly(AfterCount(1)). The Pipeline looks as follows:

  (p | ReadFromKafka(
      consumer_config={'bootstrap.servers': bootstrapServer},
      topics=[inputTopic],
      expansion_service=get_expansion_service())
    | "Tokenize" >> beam.FlatMap(lambda line: re.findall(r'[A-Za-z\']+', line))
    | beam.WindowInto(
          window.GlobalWindows(),
          trigger=trigger.Repeatedly(trigger.AfterCount(1)),
          accumulation_mode=trigger.AccumulationMode.DISCARDING,
          allowed_lateness=window.Duration.of(0))
    ...

The error is

Caused by: java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1]     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)     at org.apache.beam.runners.core.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:165)     at org.apache.beam.runners.core.construction.CoderTranslation.fromProto(CoderTranslation.java:145)     at org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:87)     at org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:82)     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    ... 50 more

Which seems like missing coder, but what I do not understand is why is the coder passed to the ExpansionService in the first place. This happens even if I place a (map) transform between the ReadFromKafka a WindowInto transform. I think the expansion service should not need to know about what happens later in the Pipeline. With default trigger the Pipeline is able to run.

Any ideas?

 Jan

Reply via email to