Hi,
I hit an issue when using x-lang python pipeline (with ReadFromKafka)
with subsequent WindowInto with trigger=Repeatedly(AfterCount(1)). The
Pipeline looks as follows:
(p | ReadFromKafka(
consumer_config={'bootstrap.servers': bootstrapServer},
topics=[inputTopic],
expansion_service=get_expansion_service())
| "Tokenize" >> beam.FlatMap(lambda line:
re.findall(r'[A-Za-z\']+', line))
| beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING,
allowed_lateness=window.Duration.of(0))
...
The error is
Caused by: java.lang.IllegalArgumentException: Unknown Coder URN
beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1,
beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1,
beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1,
beam:coder:iterable:v1, beam:coder:timer:v1,
beam:coder:length_prefix:v1, beam:coder:global_window:v1,
beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1,
beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
at
org.apache.beam.runners.core.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:165)
at
org.apache.beam.runners.core.construction.CoderTranslation.fromProto(CoderTranslation.java:145)
at
org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:87)
at
org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:82)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 50 more
Which seems like missing coder, but what I do not understand is why is
the coder passed to the ExpansionService in the first place. This
happens even if I place a (map) transform between the ReadFromKafka a
WindowInto transform. I think the expansion service should not need to
know about what happens later in the Pipeline. With default trigger the
Pipeline is able to run.
Any ideas?
Jan