Hello all! Did any of you encounter a similar problem with PAssert on Flink (set up as standalone instance not embedded one)? For simple test:
PCollection<Integer> res = pipeline.apply("Generate 5", Create.of(5)); PAssert.thatSingleton(res).isEqualTo(5); Only ~1 of 10 attempts (rough estimations) ends up with success, for the rest I get java.lang.AssertionError: Expected 1 successful assertions, but found 0. Expected: is <1L> but: was <0L> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at org.apache.beam.sdk.testing.TestPipeline.verifyPAssertsSucceeded(TestPipeline.java:540) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:351) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331) … A short primer on PAsserts: They work by performing an assertion on all elements of a PCollection. If the assertion is correct, a Counter metric holding the number of successful PAsserts is incremented, if it fails, an analogous counter with failures is increased. Checking the correctness of a pipeline is performed by TestPipeline class by counting PAsserts in pipeline definition and comparing their number with success counter. In this case the success counter is never saved in the first place, this is why the assertion error reads expected 1, was 0. I checked the metrics reporting step by step in the debugger and didn’t find a trace of the counter being saved, there were no metrics accumulators reported back from Flink. What is interesting with ‘--streaming=true’ everything is working fine. Then I run PAssertTest on local flink instance (slightly modifying ValidatesRunner test): - I’ve created my own interface public interface ValidatesRunner_custom extends NeedsRunner {} - Replaced all occurrences of ValidatesRunner.class to ValidatesRunner_custom.class in PAsserTest , just to run only those tests - Modified a bit flink_runner.gradle flink_runner.gradle L[188] def pipelineOptions = JsonOutput.toJson( ["--runner=TestFlinkRunner", "--flinkMaster=localhost:8081", "--streaming=${config.streaming}", "--parallelism=1", ]) L[202] includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner_custom' excludeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' - Invoked with ./gradlew validatesRunner -p runners/flink/1.9 --stacktrace --info Result was the same. This might be a clue that our runner validation sets up embedded flink instance using FlinkMiniClusterEntryPoint which uses MiniCluster from flink. On the other, if I launch cluster according to docs https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html#start-a-local-flink-cluster it uses StandaloneSessionClusterEntrypoint to start instance. I tried to find something that might cause differences in behavior but, well, I am far from being familiar with flink code. And, of course, it can be a false lead. Let me know what you think. Thanks Pawel