-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/#review132710
-----------------------------------------------------------


Fix it, then Ship it!





samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 62)
<https://reviews.apache.org/r/47197/#comment196973>

    If this is really code that can be run from multiple threads, as opposed to 
code that was blowing up due to ConcurrentModificationException (which is 
sometimes a misleading name), then this needs to be volatile.
    
    isStarted might also need to be volatile, but I didn't look at how it was 
being used.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 65)
<https://reviews.apache.org/r/47197/#comment196969>

    You only need volatile here (vs. AtomicReference) since you're not using 
any CAS operation.
    
    For full safety, you need to wrap the set in an unmodifiable wrapper. 
Otherwise it would be possible to modify the set via "read only" methods like 
getBootstrappedStream.


- Chris Pettitt


On May 10, 2016, 11:07 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47197/
> -----------------------------------------------------------
> 
> (Updated May 10, 2016, 11:07 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-948
>     https://issues.apache.org/jira/browse/SAMZA-948
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe
> 
> See the stack traces in the JIRA for more context. Essentially the consumer 
> can bootstrap concurrently from multiple code paths (AM UI, RM Client 
> callbacks, etc) and with the remove() logic that was added in SAMZA-913, we 
> can get ConcurrentModificationExceptions. 
> 
> Fix:
> * Use an AtomicReference to swap in the updated messages when they are ready 
> * In bootstrap()
>     * Acquire a lock
>     * Make a copy of the messages
>     * Append the new messages
>     * Set the atomic reference to the copy
>     * Release lock
> 
> Also sneaking in a log message fix for JobCoordinator. It previously didn't 
> include the task names.
> 
> 
> Diffs
> -----
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  8e1057b4d055159acb49d2cc60d3acad7665a532 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 03f48db7f42b2617995b14cf51248b82b6cc2636 
> 
> Diff: https://reviews.apache.org/r/47197/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew build
> 
> 
> Thanks,
> 
> Jake Maes
> 
>

Reply via email to