I believe that the ParDos are being invoked in parallel. I'm not sure on the exact semantics, but I believe that beam will execute separate keys on separate threads, when it processes different bundles for different those keys. I logged the thread IDs in this test, to verify that different threads are invoking this code. Applying my fix, I was able to pass the test 400/400 runs.
I talked to Luke, and he suggested using PAssert, which is the most thread safe/standard way to verify pipeline results It also simplifies the code a little bit, removing the last unnecessary DoFn. PTAL at this PR, I recommend committing this in to remove the concurrency issue collecting test results and remove flakeyness in this test. https://github.com/apache/beam/pull/7214/files Please also try to follow this pattern on any other tests which you may have. FWIW, Here is the logged thread ids that I saw, I appended logs to a ConcurrentLinkedQueue and printed them at the end of the test, so this shows the separate threads and the interleaving of them. processElement collectResults 26 : 5000 results: 1601591000 threadId: pool-32-thread-15 processElement collectResults 34 : 4093 results: 1360449464 threadId: pool-32-thread-14 processElement collectResults 47 : 4093 results: 323962224 threadId: pool-32-thread-19 processElement collectResults 19 : 4093 results: 323962224 threadId: pool-32-thread-19 processElement collectResults 45 : 4093 results: 167183883 threadId: pool-32-thread-18 processElement collectResults 0 : 4093 results: 167183883 threadId: pool-32-thread-18 processElement collectResults 2 : 4093 results: 167183883 threadId: pool-32-thread-18 processElement collectResults 30 : 4093 results: 865903006 threadId: pool-32-thread-21 processElement collectResults 11 : 4093 results: 865903006 threadId: pool-32-thread-21 processElement collectResults 1 : 4093 results: 865903006 threadId: pool-32-thread-21 processElement collectResults 41 : 4093 results: 1183940089 threadId: pool-32-thread-23 processElement collectResults 7 : 4093 results: 1183940089 threadId: pool-32-thread-23 processElement collectResults 13 : 4093 results: 1183940089 threadId: pool-32-thread-23 processElement collectResults 36 : 4093 results: 1183940089 threadId: pool-32-thread-23 processElement collectResults 21 : 4093 results: 907415986 threadId: pool-32-thread-17 processElement collectResults 32 : 4093 results: 907415986 threadId: pool-32-thread-17 processElement collectResults 10 : 4093 results: 907415986 threadId: pool-32-thread-17 processElement collectResults 20 : 4093 results: 907415986 threadId: pool-32-thread-17 processElement collectResults 14 : 4093 results: 907415986 threadId: pool-32-thread-17 processElement collectResults 24 : 4093 results: 1391785351 threadId: pool-32-thread-15 processElement collectResults 46 : 4093 results: 1391785351 threadId: pool-32-thread-15 processElement collectResults 17 : 4093 results: 1391785351 threadId: pool-32-thread-15 On Wed, Dec 5, 2018 at 6:49 AM Maximilian Michels <m...@apache.org> wrote: > Thank you for looking into the test. For me the flakiness was solely > caused by > the non thread-safe GrpcStateService. I have since closed the JIRA issue > because > I didn't see another failure since the fix. > > Your fixes are valid, but they won't fix flakiness (if present) in the > current > testing pipeline. Why? The results are only ever written by 1 worker > because the > testing source uses Impulse which generates a signal only received by a > single > worker. So the shared results list is not a problem for this test. > > Let me quickly comment on the changes you mentioned: > > 1) Yes, if we had a parallel source, the List should be a concurrent or > synchronized list. > > 2) Using a static list should be fine for testing purposes. There are no > other > tests accessing this map. Tests are not run parallel on a class level. > Besides, > there is only one test in this class. > > 3) If you make the results object transient, then it won't be serialized, > so it > will be 'null' after being deserialized. > > > Thank you for spending the time to look into the test. Do you think it > would > make sense to address changes to the test separately of your PR? > > I believe the test can be further improved, e.g. to run more parallel. > Also, if > you see any flakiness after the merged fix, please post the build log in > the > JIRA issue. > > Thanks, > Max > > On 05.12.18 03:18, Alex Amato wrote: > > Well, here is my hacky solution. > > You can see the changes I make to PortableTimersExecutionTest > > https://github.com/apache/beam/pull/6786/files > > > > I don't really understand why the pipeline never starts running when I > make the > > results object transient in PortableTiemrsExecutionTest. > > > > So I instead continue to access a static object, but key it with the > test > > parameter, to prevent tests from interfering with each other. > > > > I am not too sure how to proceed. I don't really want to check in this > hacky > > solution. But I am not too sure of what else to do with solved the > problems. > > Please let me know if you have any suggestions. > > > > On Tue, Dec 4, 2018 at 5:26 PM Alex Amato <ajam...@google.com > > <mailto:ajam...@google.com>> wrote: > > > > Thanks for letting me know Maximillian, > > > > Btw, I've been looking a this test the last few days as well. I have > found a > > few other concurrency issues. That I hope to send a PR out for. > > > > * The PortableTimersExecutionTest result variable is using a static > > ArrayList, but can be writen to concurrently (by multiple thread > AND > > multiple parameterized test instnace) which causing flakeyness. > > * But just using a ConcurrentLinkedQueue and a non static variable > isn't > > sufficient as that will cause a copy of the results object to be > copied > > during doFn serialization. So that makes all the assertions > fail, since > > nothing get written to the same result object the test is using/ > > o So it should be made private transient final. However, after > trying > > this I am seeing the test timeout, and I am not sure why. > Continuing > > to debug this. > > > > > > I think that my PR was increasing flakeyness, which is why I saw > more of > > these issues. > > Just wanted to point these out in the meantime, hopefull it helps > with > > debugging for you too. > > > > On Fri, Nov 30, 2018 at 7:49 AM Maximilian Michels <m...@apache.org > > <mailto:m...@apache.org>> wrote: > > > > This turned out to be a tricky bug. Robert and me had a joined > debugging > > session and managed to find the culprit. > > > > PR pending: https://github.com/apache/beam/pull/7171 > > > > On 27.11.18 19:35, Kenneth Knowles wrote: > > > I actually didn't look at this one. I filed a bunch more > adjacent flake > > > bugs. I didn't find your bug but I do see that test flaking > at the same > > > time as the others. FWIW here is the list of flakes and > sickbayed tests: > > > https://issues.apache.org/jira/issues/?filter=12343195 > > > > > > Kenn > > > > > > On Tue, Nov 27, 2018 at 10:25 AM Alex Amato < > ajam...@google.com > > <mailto:ajam...@google.com> > > > <mailto:ajam...@google.com <mailto:ajam...@google.com>>> > wrote: > > > > > > +Ken, > > > > > > Did you happen to look into this test? I heard that you > may have > > > been looking into this. > > > > > > On Mon, Nov 26, 2018 at 3:36 PM Maximilian Michels > > <m...@apache.org <mailto:m...@apache.org> > > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > > > > > Hi Alex, > > > > > > Thanks for your help! I'm quite used to debugging > > > concurrent/distributed > > > problems. But this one is quite tricky, especially > with regards > > > to GRPC > > > threads. I try to provide more information in the > following. > > > > > > There are two observations: > > > > > > 1) The problem is specifically related to how the > cleanup is > > > performed > > > for the EmbeddedEnvironmentFactory. The environment > is shutdown > > > when the > > > SDK Harness exists but the GRPC threads continue to > linger for > > > some time > > > and may stall state processing on the next test. > > > > > > If you do _not_ close DefaultJobBundleFactory, which > happens > > during > > > close() or dispose() in the > FlinkExecutableStageFunction or > > > ExecutableStageDoFnOperator respectively, the tests > run just > > > fine. I ran > > > 1000 test runs without a single failure. > > > > > > The EmbeddedEnvironment uses direct channels which > are marked > > > experimental in GRPC. We may have to convert them to > regular > > socket > > > communication. > > > > > > 2) Try setting a conditional breakpoint in > GrpcStateService > > > which will > > > never break, e.g. "false". Set it here: > > > > > > https://github.com/apache/beam/blob/6da9aa5594f96c0201d497f6dce4797c4984a2fd/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java#L134 > > > > > > The tests will never fail. The SDK harness is always > shutdown > > > correctly > > > at the end of the test. > > > > > > Thanks, > > > Max > > > > > > On 26.11.18 19:15, Alex Amato wrote: > > > > Thanks Maximilian, let me know if you need any > help. Usually > > > I debug > > > > this sort of thing by pausing the IntelliJ > debugger to see > > > all the > > > > different threads which are waiting on various > conditions. If > > > you find > > > > any insights from that, please post them here and > we can try > > > to figure > > > > out the source of the stuckness. Perhaps it may be > some > > > concurrency > > > > issue leading to deadlock? > > > > > > > > On Thu, Nov 22, 2018 at 12:57 PM Maximilian Michels > > > <m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>> > > > > <mailto:m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: > > > > > > > > I couldn't fix it thus far. The issue does not > seem to be > > > in the Flink > > > > Runner but in the way the tests utilizes the > EMBEDDED > > > environment to > > > > run > > > > multiple portable jobs in a row. > > > > > > > > When it gets stuck it is in RemoteBundle#close > and it is > > > independent of > > > > the test type (batch and streaming have > different > > > implementations). > > > > > > > > Will give it another look tomorrow. > > > > > > > > Thanks, > > > > Max > > > > > > > > On 22.11.18 13:07, Maximilian Michels wrote: > > > > > Hi Alex, > > > > > > > > > > The test seems to have gotten flaky after > we merged > > > support for > > > > portable > > > > > timers in Flink's batch mode. > > > > > > > > > > Looking into this now. > > > > > > > > > > Thanks, > > > > > Max > > > > > > > > > > On 21.11.18 23:56, Alex Amato wrote: > > > > >> Hello, I have noticed > > > > >> > > > > that org.apache.beam.runners.flink.PortableTimersExecutionTest > > > > is very > > > > >> flakey, and repro'd this test timeout on > the master > > > branch in > > > > 40/50 runs. > > > > >> > > > > >> I filed a JIRA issue: BEAM-6111 > > > > >> < > https://issues.apache.org/jira/browse/BEAM-6111>. I > > > was just > > > > >> wondering if anyone knew why this may be > occurring, > > > and to check if > > > > >> anyone else has been experiencing this. > > > > >> > > > > >> Thanks, > > > > >> Alex > > > > > > > > > >