> On May 11, 2016, 6:28 p.m., Chris Pettitt wrote: > >
Yes, and yes. Thanks! - Jake ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/47197/#review132710 ----------------------------------------------------------- On May 11, 2016, 8:13 p.m., Jake Maes wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/47197/ > ----------------------------------------------------------- > > (Updated May 11, 2016, 8:13 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina > Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure). > > > Bugs: SAMZA-948 > https://issues.apache.org/jira/browse/SAMZA-948 > > > Repository: samza > > > Description > ------- > > SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe > > See the stack traces in the JIRA for more context. Essentially the consumer > can bootstrap concurrently from multiple code paths (AM UI, RM Client > callbacks, etc) and with the remove() logic that was added in SAMZA-913, we > can get ConcurrentModificationExceptions. > > Fix: > * Use an AtomicReference to swap in the updated messages when they are ready > * In bootstrap() > * Acquire a lock > * Make a copy of the messages > * Append the new messages > * Set the atomic reference to the copy > * Release lock > > Also sneaking in a log message fix for JobCoordinator. It previously didn't > include the task names. > > > Diffs > ----- > > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > 8e1057b4d055159acb49d2cc60d3acad7665a532 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > 03f48db7f42b2617995b14cf51248b82b6cc2636 > > Diff: https://reviews.apache.org/r/47197/diff/ > > > Testing > ------- > > ./gradlew build > > > Thanks, > > Jake Maes > >