I believe that the ParDos are being invoked in parallel. I'm not sure on
the exact semantics, but I believe that beam will execute separate keys on
separate threads, when it processes different bundles for different those
keys.
I logged the thread IDs in this test, to verify that different threads are
invoking this code.
Applying my fix, I was able to pass the test 400/400 runs.

I talked to Luke, and he suggested using PAssert, which is the most thread
safe/standard way to verify pipeline results It also simplifies the code a
little bit, removing the last unnecessary DoFn.

PTAL at this PR, I recommend committing this in to remove the concurrency
issue collecting test results and remove flakeyness in this test.
https://github.com/apache/beam/pull/7214/files

Please also try to follow this pattern on any other tests which you may
have.

FWIW, Here is the logged thread ids that I saw, I appended logs to a
ConcurrentLinkedQueue and printed them at the end of the test, so this
shows the separate threads and the interleaving of them.
processElement collectResults 26 : 5000 results: 1601591000 threadId:
pool-32-thread-15
processElement collectResults 34 : 4093 results: 1360449464 threadId:
pool-32-thread-14
processElement collectResults 47 : 4093 results: 323962224 threadId:
pool-32-thread-19
processElement collectResults 19 : 4093 results: 323962224 threadId:
pool-32-thread-19
processElement collectResults 45 : 4093 results: 167183883 threadId:
pool-32-thread-18
processElement collectResults 0 : 4093 results: 167183883 threadId:
pool-32-thread-18
processElement collectResults 2 : 4093 results: 167183883 threadId:
pool-32-thread-18
processElement collectResults 30 : 4093 results: 865903006 threadId:
pool-32-thread-21
processElement collectResults 11 : 4093 results: 865903006 threadId:
pool-32-thread-21
processElement collectResults 1 : 4093 results: 865903006 threadId:
pool-32-thread-21
processElement collectResults 41 : 4093 results: 1183940089 threadId:
pool-32-thread-23
processElement collectResults 7 : 4093 results: 1183940089 threadId:
pool-32-thread-23
processElement collectResults 13 : 4093 results: 1183940089 threadId:
pool-32-thread-23
processElement collectResults 36 : 4093 results: 1183940089 threadId:
pool-32-thread-23
processElement collectResults 21 : 4093 results: 907415986 threadId:
pool-32-thread-17
processElement collectResults 32 : 4093 results: 907415986 threadId:
pool-32-thread-17
processElement collectResults 10 : 4093 results: 907415986 threadId:
pool-32-thread-17
processElement collectResults 20 : 4093 results: 907415986 threadId:
pool-32-thread-17
processElement collectResults 14 : 4093 results: 907415986 threadId:
pool-32-thread-17
processElement collectResults 24 : 4093 results: 1391785351 threadId:
pool-32-thread-15
processElement collectResults 46 : 4093 results: 1391785351 threadId:
pool-32-thread-15
processElement collectResults 17 : 4093 results: 1391785351 threadId:
pool-32-thread-15



On Wed, Dec 5, 2018 at 6:49 AM Maximilian Michels <m...@apache.org> wrote:

> Thank you for looking into the test. For me the flakiness was solely
> caused by
> the non thread-safe GrpcStateService. I have since closed the JIRA issue
> because
> I didn't see another failure since the fix.
>
> Your fixes are valid, but they won't fix flakiness (if present) in the
> current
> testing pipeline. Why? The results are only ever written by 1 worker
> because the
> testing source uses Impulse which generates a signal only received by a
> single
> worker. So the shared results list is not a problem for this test.
>
> Let me quickly comment on the changes you mentioned:
>
> 1) Yes, if we had a parallel source, the List should be a concurrent or
> synchronized list.
>
> 2) Using a static list should be fine for testing purposes. There are no
> other
> tests accessing this map. Tests are not run parallel on a class level.
> Besides,
> there is only one test in this class.
>
> 3) If you make the results object transient, then it won't be serialized,
> so it
> will be 'null' after being deserialized.
>
>
> Thank you for spending the time to look into the test. Do you think it
> would
> make sense to address changes to the test separately of your PR?
>
> I believe the test can be further improved, e.g. to run more parallel.
> Also, if
> you see any flakiness after the merged fix, please post the build log in
> the
> JIRA issue.
>
> Thanks,
> Max
>
> On 05.12.18 03:18, Alex Amato wrote:
> > Well, here is my hacky solution.
> > You can see the changes I make to PortableTimersExecutionTest
> > https://github.com/apache/beam/pull/6786/files
> >
> > I don't really understand why the pipeline never starts running when I
> make the
> > results object transient in PortableTiemrsExecutionTest.
> >
> > So I instead continue to access a static object, but key it with the
> test
> > parameter, to prevent tests from interfering with each other.
> >
> > I am not too sure how to proceed. I don't really want to check in this
> hacky
> > solution. But I am not too sure of what else to do with solved the
> problems.
> > Please let me know if you have any suggestions.
> >
> > On Tue, Dec 4, 2018 at 5:26 PM Alex Amato <ajam...@google.com
> > <mailto:ajam...@google.com>> wrote:
> >
> >     Thanks for letting me know Maximillian,
> >
> >     Btw, I've been looking a this test the last few days as well. I have
> found a
> >     few other concurrency issues. That I hope to send a PR out for.
> >
> >       * The PortableTimersExecutionTest result variable is using a static
> >         ArrayList, but can be writen to concurrently (by multiple thread
> AND
> >         multiple parameterized test instnace) which causing flakeyness.
> >       * But just using a ConcurrentLinkedQueue and a non static variable
> isn't
> >         sufficient as that will cause a copy of the results object to be
> copied
> >         during doFn serialization. So that makes all the assertions
> fail, since
> >         nothing get written to the same result object the test is using/
> >           o So it should be made private transient final. However, after
> trying
> >             this I am seeing the test timeout, and I am not sure why.
> Continuing
> >             to debug this.
> >
> >
> >     I think that my PR was increasing flakeyness, which is why I saw
> more of
> >     these issues.
> >     Just wanted to point these out in the meantime, hopefull it helps
> with
> >     debugging for you too.
> >
> >     On Fri, Nov 30, 2018 at 7:49 AM Maximilian Michels <m...@apache.org
> >     <mailto:m...@apache.org>> wrote:
> >
> >         This turned out to be a tricky bug. Robert and me had a joined
> debugging
> >         session and managed to find the culprit.
> >
> >         PR pending: https://github.com/apache/beam/pull/7171
> >
> >         On 27.11.18 19:35, Kenneth Knowles wrote:
> >          > I actually didn't look at this one. I filed a bunch more
> adjacent flake
> >          > bugs. I didn't find your bug but I do see that test flaking
> at the same
> >          > time as the others. FWIW here is the list of flakes and
> sickbayed tests:
> >          > https://issues.apache.org/jira/issues/?filter=12343195
> >          >
> >          > Kenn
> >          >
> >          > On Tue, Nov 27, 2018 at 10:25 AM Alex Amato <
> ajam...@google.com
> >         <mailto:ajam...@google.com>
> >          > <mailto:ajam...@google.com <mailto:ajam...@google.com>>>
> wrote:
> >          >
> >          >     +Ken,
> >          >
> >          >     Did you happen to look into this test? I heard that you
> may have
> >          >     been looking into this.
> >          >
> >          >     On Mon, Nov 26, 2018 at 3:36 PM Maximilian Michels
> >         <m...@apache.org <mailto:m...@apache.org>
> >          >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >          >
> >          >         Hi Alex,
> >          >
> >          >         Thanks for your help! I'm quite used to debugging
> >          >         concurrent/distributed
> >          >         problems. But this one is quite tricky, especially
> with regards
> >          >         to GRPC
> >          >         threads. I try to provide more information in the
> following.
> >          >
> >          >         There are two observations:
> >          >
> >          >         1) The problem is specifically related to how the
> cleanup is
> >          >         performed
> >          >         for the EmbeddedEnvironmentFactory. The environment
> is shutdown
> >          >         when the
> >          >         SDK Harness exists but the GRPC threads continue to
> linger for
> >          >         some time
> >          >         and may stall state processing on the next test.
> >          >
> >          >         If you do _not_ close DefaultJobBundleFactory, which
> happens
> >         during
> >          >         close() or dispose() in the
> FlinkExecutableStageFunction or
> >          >         ExecutableStageDoFnOperator respectively, the tests
> run just
> >          >         fine. I ran
> >          >         1000 test runs without a single failure.
> >          >
> >          >         The EmbeddedEnvironment uses direct channels which
> are marked
> >          >         experimental in GRPC. We may have to convert them to
> regular
> >         socket
> >          >         communication.
> >          >
> >          >         2) Try setting a conditional breakpoint in
> GrpcStateService
> >          >         which will
> >          >         never break, e.g. "false". Set it here:
> >          >
> >
> https://github.com/apache/beam/blob/6da9aa5594f96c0201d497f6dce4797c4984a2fd/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java#L134
> >          >
> >          >         The tests will never fail. The SDK harness is always
> shutdown
> >          >         correctly
> >          >         at the end of the test.
> >          >
> >          >         Thanks,
> >          >         Max
> >          >
> >          >         On 26.11.18 19:15, Alex Amato wrote:
> >          >          > Thanks Maximilian, let me know if you need any
> help. Usually
> >          >         I debug
> >          >          > this sort of thing by pausing the IntelliJ
> debugger to see
> >          >         all the
> >          >          > different threads which are waiting on various
> conditions. If
> >          >         you find
> >          >          > any insights from that, please post them here and
> we can try
> >          >         to figure
> >          >          > out the source of the stuckness. Perhaps it may be
> some
> >          >         concurrency
> >          >          > issue leading to deadlock?
> >          >          >
> >          >          > On Thu, Nov 22, 2018 at 12:57 PM Maximilian Michels
> >          >         <m...@apache.org <mailto:m...@apache.org>
> >         <mailto:m...@apache.org <mailto:m...@apache.org>>
> >          >          > <mailto:m...@apache.org <mailto:m...@apache.org>
> >         <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >          >          >
> >          >          >     I couldn't fix it thus far. The issue does not
> seem to be
> >          >         in the Flink
> >          >          >     Runner but in the way the tests utilizes the
> EMBEDDED
> >          >         environment to
> >          >          >     run
> >          >          >     multiple portable jobs in a row.
> >          >          >
> >          >          >     When it gets stuck it is in RemoteBundle#close
> and it is
> >          >         independent of
> >          >          >     the test type (batch and streaming have
> different
> >          >         implementations).
> >          >          >
> >          >          >     Will give it another look tomorrow.
> >          >          >
> >          >          >     Thanks,
> >          >          >     Max
> >          >          >
> >          >          >     On 22.11.18 13:07, Maximilian Michels wrote:
> >          >          >      > Hi Alex,
> >          >          >      >
> >          >          >      > The test seems to have gotten flaky after
> we merged
> >          >         support for
> >          >          >     portable
> >          >          >      > timers in Flink's batch mode.
> >          >          >      >
> >          >          >      > Looking into this now.
> >          >          >      >
> >          >          >      > Thanks,
> >          >          >      > Max
> >          >          >      >
> >          >          >      > On 21.11.18 23:56, Alex Amato wrote:
> >          >          >      >> Hello, I have noticed
> >          >          >      >>
> >          >
>  that org.apache.beam.runners.flink.PortableTimersExecutionTest
> >          >          >     is very
> >          >          >      >> flakey, and repro'd this test timeout on
> the master
> >          >         branch in
> >          >          >     40/50 runs.
> >          >          >      >>
> >          >          >      >> I filed a JIRA issue: BEAM-6111
> >          >          >      >> <
> https://issues.apache.org/jira/browse/BEAM-6111>. I
> >          >         was just
> >          >          >      >> wondering if anyone knew why this may be
> occurring,
> >          >         and to check if
> >          >          >      >> anyone else has been experiencing this.
> >          >          >      >>
> >          >          >      >> Thanks,
> >          >          >      >> Alex
> >          >          >
> >          >
> >
>

Reply via email to