Hi,
I hit another issue with the portable Flink runner. Long story short -
reading from Kafka is not working in portable Flink. After solving
issues with expansion service configuration (ability to add
use_deprecated_read) option, because flink portable runner has issues
with SDF [1], [2]. After being able to inject the use_deprecated_read
into expansion service I was able to get an execution DAG that has the
UnboundedSource, but then more and more issues appeared (probably
related to missing LengthPrefixCoder somewhere - maybe at the output
from the primitive Read). I wanted to create a test for it and I found
out, that there actually is ReadSourcePortableTest in FlinkRunner, but
_it tests nothing_. The problem is that Read is transformed to SDF, so
this test tests the SDF, not the Read transform. As a result, the Read
transform does not work.
I tried using convertReadBasedSplittableDoFnsToPrimitiveReads so that I
could make the test fail and debug that, but I got into
java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: BOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced
which gave me the last knock-out. :)
My current impression is that starting from Beam 2.25.0, portable
FlinkRunner is not able to read from Kafka. Could someone give me a hint
about what is wrong with using
convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11991
[2] https://issues.apache.org/jira/browse/BEAM-11998
[3] https://github.com/apache/beam/pull/15181