-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/
-----------------------------------------------------------
(Updated May 12, 2016, 12:32 a.m.)
Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina
Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
Bugs: SAMZA-948
https://issues.apache.org/jira/browse/SAMZA-948
Repository: samza
Description
-------
SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe
See the stack traces in the JIRA for more context. Essentially the consumer can
bootstrap concurrently from multiple code paths (AM UI, RM Client callbacks,
etc) and with the remove() logic that was added in SAMZA-913, we can get
ConcurrentModificationExceptions.
Fix:
* Use an AtomicReference to swap in the updated messages when they are ready
* In bootstrap()
* Acquire a lock
* Make a copy of the messages
* Append the new messages
* Set the atomic reference to the copy
* Release lock
Also sneaking in a log message fix for JobCoordinator. It previously didn't
include the task names.
Diffs (updated)
-----
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
8e1057b4d055159acb49d2cc60d3acad7665a532
samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
03f48db7f42b2617995b14cf51248b82b6cc2636
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
949902770124b4223b1cca307a66f75d7e8ebdaf
Diff: https://reviews.apache.org/r/47197/diff/
Testing
-------
./gradlew build
Thanks,
Jake Maes