Hi,

this was a ride. But I managed to get that working. I'd like to discuss two points, though:

 a) I had to push Java coders to ModelCoders for Java (which makes sense to me, but is that correct?). See [1]. It is needed so that the Read transform (executed directly in TaskManager) can correctly communicate with Java SDK harness using custom coders (which is tested here [2]).

 b) I'd strongly prefer if we moved the handling of use_deprecated_read from outside of the Read PTransform directly into expand method, see [3]. Though this is not needed for the Read on Flink to work, it seems cleaner.

WDYT?

 Jan

[1] https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375

[2] https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201

[3] https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb

On 7/18/21 6:29 PM, Jan Lukavský wrote:

Hi,

I was debugging the issue and it relates to pipeline fusion - it seems that the primitive Read transform gets fused and then is 'missing' as source. I'm a little lost in the code, but the most strange parts are that:

 a) I tried to reject fusion of primitive Read by adding GreedyPCollectionFusers::cannotFuse for PTransformTranslation.READ_TRANSFORM_URN to GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't change the exception

 b) I tried adding Reshuffle.viaRandomKey between Read and PAssert, but that didn't change it either

 c) when I run portable Pipeline with use_deprecated_read on Flink it actually runs (though it fails when it actually reads any data, but if the input is empty, the job runs), so it does not hit the same issue, which is a mystery to me

If anyone has any pointers that I can investigate, I'd be really grateful.

Thanks in advance,

 Jan


On 7/16/21 2:00 PM, Jan Lukavský wrote:

Hi,

I hit another issue with the portable Flink runner. Long story short - reading from Kafka is not working in portable Flink. After solving issues with expansion service configuration (ability to add use_deprecated_read) option, because flink portable runner has issues with SDF [1], [2]. After being able to inject the use_deprecated_read into expansion service I was able to get an execution DAG that has the UnboundedSource, but then more and more issues appeared (probably related to missing LengthPrefixCoder somewhere - maybe at the output from the primitive Read). I wanted to create a test for it and I found out, that there actually is ReadSourcePortableTest in FlinkRunner, but _it tests nothing_. The problem is that Read is transformed to SDF, so this test tests the SDF, not the Read transform. As a result, the Read transform does not work.

I tried using convertReadBasedSplittableDoFnsToPrimitiveReads so that I could make the test fail and debug that, but I got into

java.lang.IllegalArgumentException: PCollectionNodes 
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
 PCollection=unique_name: 
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: BOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced


which gave me the last knock-out. :)

My current impression is that starting from Beam 2.25.0, portable FlinkRunner is not able to read from Kafka. Could someone give me a hint about what is wrong with using convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?

 Jan

[1] https://issues.apache.org/jira/browse/BEAM-11991

[2] https://issues.apache.org/jira/browse/BEAM-11998

[3] https://github.com/apache/beam/pull/15181

Reply via email to