----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/47197/ -----------------------------------------------------------
Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure). Bugs: SAMZA-948 https://issues.apache.org/jira/browse/SAMZA-948 Repository: samza Description ------- SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe See the stack traces in the JIRA for more context. Essentially the consumer can bootstrap concurrently from multiple code paths (AM UI, RM Client callbacks, etc) and with the remove() logic that was added in SAMZA-913, we can get ConcurrentModificationExceptions. Fix: * Use an AtomicReference to swap in the updated messages when they are ready * In bootstrap() * Acquire a lock * Make a copy of the messages * Append the new messages * Set the atomic reference to the copy * Release lock Also sneaking in a log message fix for JobCoordinator. It previously didn't include the task names. Diffs ----- samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 8e1057b4d055159acb49d2cc60d3acad7665a532 samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 03f48db7f42b2617995b14cf51248b82b6cc2636 Diff: https://reviews.apache.org/r/47197/diff/ Testing ------- ./gradlew build Thanks, Jake Maes