Hello all!

Did any of you encounter a similar problem with PAssert on Flink (set up as
standalone instance not embedded one)? For simple test:

PCollection<Integer> res = pipeline.apply("Generate 5", Create.of(5));

PAssert.thatSingleton(res).isEqualTo(5);

Only ~1 of 10 attempts (rough estimations) ends up with success, for the
rest I get

java.lang.AssertionError: Expected 1 successful assertions, but found 0.

   Expected: is <1L>

        but: was <0L>

       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)

       at
org.apache.beam.sdk.testing.TestPipeline.verifyPAssertsSucceeded(TestPipeline.java:540)

       at
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:351)

       at
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)

…

A short primer on PAsserts:

They work by performing an assertion on all elements of a PCollection. If
the assertion is correct, a Counter metric holding the number of successful
PAsserts is incremented, if it fails, an analogous counter with failures is
increased. Checking the correctness of a pipeline is performed by
TestPipeline class by counting PAsserts in pipeline definition and
comparing their number with success counter.

In this case the success counter is never saved in the first place, this is
why the assertion error reads expected 1, was 0.

I checked the metrics reporting step by step in the debugger and didn’t
find a trace of the counter being saved, there were no metrics accumulators
reported back from Flink.

What is interesting with ‘--streaming=true’ everything is working fine.

Then I run PAssertTest on local flink instance (slightly modifying
ValidatesRunner test):

   -

   I’ve created my own interface

public interface ValidatesRunner_custom extends NeedsRunner {}

   -

   Replaced all occurrences of  ValidatesRunner.class  to
   ValidatesRunner_custom.class in PAsserTest , just to run only those tests
   -

   Modified a bit flink_runner.gradle

flink_runner.gradle

L[188]

def pipelineOptions = JsonOutput.toJson(

       ["--runner=TestFlinkRunner",

        "--flinkMaster=localhost:8081",

        "--streaming=${config.streaming}",

        "--parallelism=1",

       ])

L[202]

includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner_custom'

excludeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'

   -

   Invoked with

./gradlew validatesRunner -p runners/flink/1.9 --stacktrace --info

Result was the same.

This might be a clue that our runner validation sets up embedded flink
instance using FlinkMiniClusterEntryPoint which uses MiniCluster from
flink. On the other, if I launch cluster according to docs
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html#start-a-local-flink-cluster
it uses StandaloneSessionClusterEntrypoint to start instance. I tried to
find something that might cause differences in behavior but, well, I am far
from being familiar with flink code. And, of course, it can be a false lead.

Let me know what you think.

Thanks

Pawel

Reply via email to