Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/
---

(Updated May 11, 2016, 11:55 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-948
https://issues.apache.org/jira/browse/SAMZA-948


Repository: samza


Description
---

SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

See the stack traces in the JIRA for more context. Essentially the consumer can 
bootstrap concurrently from multiple code paths (AM UI, RM Client callbacks, 
etc) and with the remove() logic that was added in SAMZA-913, we can get 
ConcurrentModificationExceptions. 

Fix:
* Use an AtomicReference to swap in the updated messages when they are ready 
* In bootstrap()
* Acquire a lock
* Make a copy of the messages
* Append the new messages
* Set the atomic reference to the copy
* Release lock

Also sneaking in a log message fix for JobCoordinator. It previously didn't 
include the task names.


Diffs (updated)
-

  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 8e1057b4d055159acb49d2cc60d3acad7665a532 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
03f48db7f42b2617995b14cf51248b82b6cc2636 

Diff: https://reviews.apache.org/r/47197/diff/


Testing
---

./gradlew build


Thanks,

Jake Maes



Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/
---

(Updated May 11, 2016, 11:49 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-948
https://issues.apache.org/jira/browse/SAMZA-948


Repository: samza


Description
---

SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

See the stack traces in the JIRA for more context. Essentially the consumer can 
bootstrap concurrently from multiple code paths (AM UI, RM Client callbacks, 
etc) and with the remove() logic that was added in SAMZA-913, we can get 
ConcurrentModificationExceptions. 

Fix:
* Use an AtomicReference to swap in the updated messages when they are ready 
* In bootstrap()
* Acquire a lock
* Make a copy of the messages
* Append the new messages
* Set the atomic reference to the copy
* Release lock

Also sneaking in a log message fix for JobCoordinator. It previously didn't 
include the task names.


Diffs (updated)
-

  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 8e1057b4d055159acb49d2cc60d3acad7665a532 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
03f48db7f42b2617995b14cf51248b82b6cc2636 

Diff: https://reviews.apache.org/r/47197/diff/


Testing
---

./gradlew build


Thanks,

Jake Maes



Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/#review132785
---


Ship it!





samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 66)


You actually don't need to wrap emptySet because it's already immutable.


- Chris Pettitt


On May 11, 2016, 8:13 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47197/
> ---
> 
> (Updated May 11, 2016, 8:13 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-948
> https://issues.apache.org/jira/browse/SAMZA-948
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe
> 
> See the stack traces in the JIRA for more context. Essentially the consumer 
> can bootstrap concurrently from multiple code paths (AM UI, RM Client 
> callbacks, etc) and with the remove() logic that was added in SAMZA-913, we 
> can get ConcurrentModificationExceptions. 
> 
> Fix:
> * Use an AtomicReference to swap in the updated messages when they are ready 
> * In bootstrap()
> * Acquire a lock
> * Make a copy of the messages
> * Append the new messages
> * Set the atomic reference to the copy
> * Release lock
> 
> Also sneaking in a log message fix for JobCoordinator. It previously didn't 
> include the task names.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  8e1057b4d055159acb49d2cc60d3acad7665a532 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 03f48db7f42b2617995b14cf51248b82b6cc2636 
> 
> Diff: https://reviews.apache.org/r/47197/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew build
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Yi Pan (Data Infrastructure)

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/#review132742
---


Ship it!




lgtm.


samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 146)


nit: you can do a double check on isBootstraped here again to avoid doing 
another round of bootstrap in case two thread are contending on bootstrapLock().


- Yi Pan (Data Infrastructure)


On May 11, 2016, 8:13 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47197/
> ---
> 
> (Updated May 11, 2016, 8:13 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-948
> https://issues.apache.org/jira/browse/SAMZA-948
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe
> 
> See the stack traces in the JIRA for more context. Essentially the consumer 
> can bootstrap concurrently from multiple code paths (AM UI, RM Client 
> callbacks, etc) and with the remove() logic that was added in SAMZA-913, we 
> can get ConcurrentModificationExceptions. 
> 
> Fix:
> * Use an AtomicReference to swap in the updated messages when they are ready 
> * In bootstrap()
> * Acquire a lock
> * Make a copy of the messages
> * Append the new messages
> * Set the atomic reference to the copy
> * Release lock
> 
> Also sneaking in a log message fix for JobCoordinator. It previously didn't 
> include the task names.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  8e1057b4d055159acb49d2cc60d3acad7665a532 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 03f48db7f42b2617995b14cf51248b82b6cc2636 
> 
> Diff: https://reviews.apache.org/r/47197/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew build
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Jake Maes


> On May 11, 2016, 6:28 p.m., Chris Pettitt wrote:
> >

Yes, and yes. Thanks!


- Jake


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/#review132710
---


On May 11, 2016, 8:13 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47197/
> ---
> 
> (Updated May 11, 2016, 8:13 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-948
> https://issues.apache.org/jira/browse/SAMZA-948
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe
> 
> See the stack traces in the JIRA for more context. Essentially the consumer 
> can bootstrap concurrently from multiple code paths (AM UI, RM Client 
> callbacks, etc) and with the remove() logic that was added in SAMZA-913, we 
> can get ConcurrentModificationExceptions. 
> 
> Fix:
> * Use an AtomicReference to swap in the updated messages when they are ready 
> * In bootstrap()
> * Acquire a lock
> * Make a copy of the messages
> * Append the new messages
> * Set the atomic reference to the copy
> * Release lock
> 
> Also sneaking in a log message fix for JobCoordinator. It previously didn't 
> include the task names.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  8e1057b4d055159acb49d2cc60d3acad7665a532 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 03f48db7f42b2617995b14cf51248b82b6cc2636 
> 
> Diff: https://reviews.apache.org/r/47197/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew build
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/
---

(Updated May 11, 2016, 8:13 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-948
https://issues.apache.org/jira/browse/SAMZA-948


Repository: samza


Description
---

SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

See the stack traces in the JIRA for more context. Essentially the consumer can 
bootstrap concurrently from multiple code paths (AM UI, RM Client callbacks, 
etc) and with the remove() logic that was added in SAMZA-913, we can get 
ConcurrentModificationExceptions. 

Fix:
* Use an AtomicReference to swap in the updated messages when they are ready 
* In bootstrap()
* Acquire a lock
* Make a copy of the messages
* Append the new messages
* Set the atomic reference to the copy
* Release lock

Also sneaking in a log message fix for JobCoordinator. It previously didn't 
include the task names.


Diffs (updated)
-

  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 8e1057b4d055159acb49d2cc60d3acad7665a532 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
03f48db7f42b2617995b14cf51248b82b6cc2636 

Diff: https://reviews.apache.org/r/47197/diff/


Testing
---

./gradlew build


Thanks,

Jake Maes



Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/#review132710
---


Fix it, then Ship it!





samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 62)


If this is really code that can be run from multiple threads, as opposed to 
code that was blowing up due to ConcurrentModificationException (which is 
sometimes a misleading name), then this needs to be volatile.

isStarted might also need to be volatile, but I didn't look at how it was 
being used.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 65)


You only need volatile here (vs. AtomicReference) since you're not using 
any CAS operation.

For full safety, you need to wrap the set in an unmodifiable wrapper. 
Otherwise it would be possible to modify the set via "read only" methods like 
getBootstrappedStream.


- Chris Pettitt


On May 10, 2016, 11:07 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47197/
> ---
> 
> (Updated May 10, 2016, 11:07 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-948
> https://issues.apache.org/jira/browse/SAMZA-948
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe
> 
> See the stack traces in the JIRA for more context. Essentially the consumer 
> can bootstrap concurrently from multiple code paths (AM UI, RM Client 
> callbacks, etc) and with the remove() logic that was added in SAMZA-913, we 
> can get ConcurrentModificationExceptions. 
> 
> Fix:
> * Use an AtomicReference to swap in the updated messages when they are ready 
> * In bootstrap()
> * Acquire a lock
> * Make a copy of the messages
> * Append the new messages
> * Set the atomic reference to the copy
> * Release lock
> 
> Also sneaking in a log message fix for JobCoordinator. It previously didn't 
> include the task names.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  8e1057b4d055159acb49d2cc60d3acad7665a532 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 03f48db7f42b2617995b14cf51248b82b6cc2636 
> 
> Diff: https://reviews.apache.org/r/47197/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew build
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-10 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/
---

Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-948
https://issues.apache.org/jira/browse/SAMZA-948


Repository: samza


Description
---

SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

See the stack traces in the JIRA for more context. Essentially the consumer can 
bootstrap concurrently from multiple code paths (AM UI, RM Client callbacks, 
etc) and with the remove() logic that was added in SAMZA-913, we can get 
ConcurrentModificationExceptions. 

Fix:
* Use an AtomicReference to swap in the updated messages when they are ready 
* In bootstrap()
* Acquire a lock
* Make a copy of the messages
* Append the new messages
* Set the atomic reference to the copy
* Release lock

Also sneaking in a log message fix for JobCoordinator. It previously didn't 
include the task names.


Diffs
-

  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 8e1057b4d055159acb49d2cc60d3acad7665a532 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
03f48db7f42b2617995b14cf51248b82b6cc2636 

Diff: https://reviews.apache.org/r/47197/diff/


Testing
---

./gradlew build


Thanks,

Jake Maes