Hi,
this was a ride. But I managed to get that working. I'd like to discuss
two points, though:
a) I had to push Java coders to ModelCoders for Java (which makes
sense to me, but is that correct?). See [1]. It is needed so that the
Read transform (executed directly in TaskManager) can correctly
communicate with Java SDK harness using custom coders (which is tested
here [2]).
b) I'd strongly prefer if we moved the handling of use_deprecated_read
from outside of the Read PTransform directly into expand method, see
[3]. Though this is not needed for the Read on Flink to work, it seems
cleaner.
WDYT?
Jan
[1]
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
[2]
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
[3]
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
On 7/18/21 6:29 PM, Jan Lukavský wrote:
Hi,
I was debugging the issue and it relates to pipeline fusion - it seems
that the primitive Read transform gets fused and then is 'missing' as
source. I'm a little lost in the code, but the most strange parts are
that:
a) I tried to reject fusion of primitive Read by adding
GreedyPCollectionFusers::cannotFuse for
PTransformTranslation.READ_TRANSFORM_URN to
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't
change the exception
b) I tried adding Reshuffle.viaRandomKey between Read and PAssert,
but that didn't change it either
c) when I run portable Pipeline with use_deprecated_read on Flink it
actually runs (though it fails when it actually reads any data, but if
the input is empty, the job runs), so it does not hit the same issue,
which is a mystery to me
If anyone has any pointers that I can investigate, I'd be really grateful.
Thanks in advance,
Jan
On 7/16/21 2:00 PM, Jan Lukavský wrote:
Hi,
I hit another issue with the portable Flink runner. Long story short
- reading from Kafka is not working in portable Flink. After solving
issues with expansion service configuration (ability to add
use_deprecated_read) option, because flink portable runner has issues
with SDF [1], [2]. After being able to inject the use_deprecated_read
into expansion service I was able to get an execution DAG that has
the UnboundedSource, but then more and more issues appeared (probably
related to missing LengthPrefixCoder somewhere - maybe at the output
from the primitive Read). I wanted to create a test for it and I
found out, that there actually is ReadSourcePortableTest in
FlinkRunner, but _it tests nothing_. The problem is that Read is
transformed to SDF, so this test tests the SDF, not the Read
transform. As a result, the Read transform does not work.
I tried using convertReadBasedSplittableDoFnsToPrimitiveReads so that
I could make the test fail and debug that, but I got into
java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: BOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced
which gave me the last knock-out. :)
My current impression is that starting from Beam 2.25.0, portable
FlinkRunner is not able to read from Kafka. Could someone give me a
hint about what is wrong with using
convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11991
[2] https://issues.apache.org/jira/browse/BEAM-11998
[3] https://github.com/apache/beam/pull/15181