Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-08-20 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review96013
---

Ship it!


I am fine if we are not adding the isBroadcast flag. We can add that if there 
is enough user interest.


samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java (line 53)


Broadcast sounds much better! :)


- Navina Ramesh


On Aug. 20, 2015, 8:13 p.m., Yan Fang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34974/
> ---
> 
> (Updated Aug. 20, 2015, 8:13 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-676
> https://issues.apache.org/jira/browse/SAMZA-676
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> 1. added offsetComparator method in SystemAdmin Interface
> 
> 2. added "task.global.inputs" config
> 
> 3. rewrote Grouper classes using Java; allows to assign global streams during 
> grouping
> 
> 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
> to preserve messages order
> 
> 5. added taskNames to the offsets in OffsetManager
> 
> 6. allowed to assign one SSP to multiple taskInstances
> 
> 7. skipped already-processed messages in RunLoop
> 
> 8. unit tests for all changes
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml aaa235a 
>   docs/learn/documentation/versioned/container/samza-container.md 9f46414 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 8177fe5 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java a920a10 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  63a1666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 20e5d26 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 24da35f 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 85b012b 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> c5a5ea5 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  9dc7051 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
>  44e95fc 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
>  3c0acad 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  c29853d 
>   samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> daa5eab 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> b9d9e73 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 4db6d5c 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 7caad28 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
>  a14169b 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
>  74daf72 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
>  deb3895 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d47de7d 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
>  1f5751e 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
> c18e90d 
>   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties 
> b590e29 
>   samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties ab90548 
>   samza-hdfs/src/test/resources/samza-hdfs-test-job-text.prope

Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-08-20 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated Aug. 20, 2015, 8:13 p.m.)


Review request for samza.


Changes
---

fixed javadoc malformed
uniformed all the "global" to "broadcast" stream
added missed license


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added "task.global.inputs" config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml aaa235a 
  docs/learn/documentation/versioned/container/samza-container.md 9f46414 
  docs/learn/documentation/versioned/jobs/configuration-table.html 8177fe5 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java a920a10 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 63a1666 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 24da35f 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
85b012b 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
9dc7051 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 c29853d 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
daa5eab 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
b9d9e73 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
4db6d5c 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
d47de7d 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
 1f5751e 
  samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
c18e90d 
  samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties 
b590e29 
  samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties ab90548 
  samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties 9df1397 
  samza-hdfs/src/test/resources/samza-hdfs-test-job.properties e69de29 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
4391e24 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
f23b8f9 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 6f67cf5 

Diff: https://reviews.apache.org/r/34974/diff/


Te

Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-08-11 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review94980
---

Ship it!


Ship It!

- Navina Ramesh


On July 29, 2015, 10:49 p.m., Yan Fang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34974/
> ---
> 
> (Updated July 29, 2015, 10:49 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-676
> https://issues.apache.org/jira/browse/SAMZA-676
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> 1. added offsetComparator method in SystemAdmin Interface
> 
> 2. added "task.global.inputs" config
> 
> 3. rewrote Grouper classes using Java; allows to assign global streams during 
> grouping
> 
> 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
> to preserve messages order
> 
> 5. added taskNames to the offsets in OffsetManager
> 
> 6. allowed to assign one SSP to multiple taskInstances
> 
> 7. skipped already-processed messages in RunLoop
> 
> 8. unit tests for all changes
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 6654319 
>   docs/learn/documentation/versioned/container/samza-container.md 9f46414 
>   docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 20e5d26 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 27b2517 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> c5a5ea5 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  9dc7051 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
>  44e95fc 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
>  3c0acad 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f410 
>   samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c46 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> 64a5844 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 84fdeaa 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 7caad28 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
>  a14169b 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
>  74daf72 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
>  deb3895 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  4097ac7 
>   
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
>  1fd5dd3 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f5 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
>  de00320 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  1629035 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
>  2a84328 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala

Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-08-11 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review93553
---


Lgtm, overall! I was able to test it as well.
Some feedback: Would it possible to mark the SSP of the IncomingMessageEnvelope 
with a boolean flag indicating whether it is from a broadcast stream or not? 
When I tried writing a sample code, I realized that the streamTask has to be 
aware of the exact topic name in order to figure out if the message was from a 
broadcast system stream. If we had an API such as 
"envelope.getSystemStreamPartition().isBroadcast()", I think it will be more 
convenient. What do you think?


docs/learn/documentation/versioned/jobs/configuration-table.html (line 453)


Rephrase - "This property specifies the partitions that all tasks should 
consume."



samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java (line 73)


malformed html error in javadocs for '<' and '>'
Also, in Line 79


I think the delay in my review caused a few more commits to go through. So, you 
have to rebase and update your patch again. 
I found this error:
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala:27:
 error: class HdfsSystemAdmin needs to be abstract, since method 
offsetComparator in trait SystemAdmin of type (x$1: String, x$2: String)Integer 
is not defined
class HdfsSystemAdmin extends SystemAdmin with Logging {
  ^
one error found

I guess you can just throw an UnsupportedMethodException for now.

- Navina Ramesh


On July 29, 2015, 10:49 p.m., Yan Fang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34974/
> ---
> 
> (Updated July 29, 2015, 10:49 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-676
> https://issues.apache.org/jira/browse/SAMZA-676
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> 1. added offsetComparator method in SystemAdmin Interface
> 
> 2. added "task.global.inputs" config
> 
> 3. rewrote Grouper classes using Java; allows to assign global streams during 
> grouping
> 
> 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
> to preserve messages order
> 
> 5. added taskNames to the offsets in OffsetManager
> 
> 6. allowed to assign one SSP to multiple taskInstances
> 
> 7. skipped already-processed messages in RunLoop
> 
> 8. unit tests for all changes
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 6654319 
>   docs/learn/documentation/versioned/container/samza-container.md 9f46414 
>   docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 20e5d26 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 27b2517 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> c5a5ea5 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  9dc7051 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
>  44e95fc 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
>  3c0acad 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f410 
>   samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOff

Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-07-29 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated July 29, 2015, 10:49 p.m.)


Review request for samza.


Changes
---

remove whiltespaces
update to latest master


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added "task.global.inputs" config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 6654319 
  docs/learn/documentation/versioned/container/samza-container.md 9f46414 
  docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
27b2517 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
9dc7051 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
84fdeaa 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
4097ac7 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
 1fd5dd3 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-06-21 Thread Yan Fang


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > Mostly looks good. Have some questions:
> > * Have you tried the message "filtering" logic to the container level 
> > instead of the task level ? Not sure which is simpler in terms of code 
> > change. Since the container has access to all the task Instances and the 
> > systemAdmins, it seems convenient to have the caughtUp map within 
> > containerContext. I could be wrong :)
> > * I want to test the patch locally before confirming a ship it. Looks 
> > awesome for a first draft!

The Container only initializes the task instances. At the beginning, the 
container knows all the information about the tasks and systems. However, after 
the RunLoop is called, the containerContext will not be updated. So it's a 
little difficult to inform the task intances when the offsets are caught up.


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, 
> > line 412
> > 
> >
> > The exception message is inaccurate. It can also happen when the 
> > taskName is not in startingOffsets map (although I am not sure if such a 
> > case will happen).

If the taskName is not in the startingOffsets map, this exception will not be 
thrown. (It is inside the loop of line 374)


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
> > line 144
> > 
> >
> > Should we a have different metric for number of messages received by 
> > process() than the number of messages actually processed?
> > We need to clarify the semantics of all our metrics, in perhaps a 
> > separate RB

yes, fixed.


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, 
> > line 132
> > 
> >
> > instead of getOrElse(null), try .orNull

fixed


> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala,
> >  line 109
> > 
> >
> > We are registering with the offset in the method invocation in Line 
> > 105. Why do we need to update the topicPartitionsAndOffsets map with the 
> > replaced offset ?
> > 
> > I understand that all tasks within the same container may be at 
> > different offset for broadcast stream ssps. But looks like 
> > consumer.register is being invoked in multiple places - TaskStorageManager 
> > & CoordinatorStreamSystemConsumer . Will the change impact these other 
> > components ?

Why do we need to update the topicPartitionsAndOffsets map with the replaced 
offset ?
  -- because linke 105 only registers the offsets in BlockingEnvelopeMap, which 
is for bufferring. It has nothing to do with the starting offsets that 
consumers will consume.

 Will the change impact these other components ?
   -- No. If the stream is changelog, all tasks have their own changelog 
(partitions). So one partitions is assigned to more than one tasks. The 
replacement will not happen. The code is the same as before-this-patch. If the 
stream is coordinatorStream, we will always consume from the beginning, the 
starting offset is always 0.


- Yan


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review88308
---


On June 22, 2015, 6:07 a.m., Yan Fang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34974/
> ---
> 
> (Updated June 22, 2015, 6:07 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-676
> https://issues.apache.org/jira/browse/SAMZA-676
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> 1. added offsetComparator method in SystemAdmin Interface
> 
> 2. added "task.global.inputs" config
> 
> 3. rewrote Grouper classes using Java; allows to assign global streams during 
> grouping
> 
> 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
> to preserve messages order
> 
> 5. added taskNames to the offsets in OffsetManager
> 
> 6. allowed to assign one SSP to multiple taskInstances
> 
> 7. skipped already-processed messages in RunLoop
> 
> 8. unit tests for all changes
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 3374f0c 
>   docs/learn/documentation/versioned/container/samza-container.md 9f46414 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 405e2ce 
>   samza-api

Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-06-21 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated June 22, 2015, 6:07 a.m.)


Review request for samza.


Changes
---

updated according to Navina's comments. Added docs.


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added "task.global.inputs" config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c 
  docs/learn/documentation/versioned/container/samza-container.md 9f46414 
  docs/learn/documentation/versioned/jobs/configuration-table.html 405e2ce 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cbacd18 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
9dc7051 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
9fb1aa9 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
d9ae187 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-06-18 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review88308
---


Mostly looks good. Have some questions:
* Have you tried the message "filtering" logic to the container level instead 
of the task level ? Not sure which is simpler in terms of code change. Since 
the container has access to all the task Instances and the systemAdmins, it 
seems convenient to have the caughtUp map within containerContext. I could be 
wrong :)
* I want to test the patch locally before confirming a ship it. Looks awesome 
for a first draft!


samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java (line 55)


This still does not handle the case of partition range. Please add the 
range handling or correct the exception message.



samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 (line 45)


nit: spacing



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
405)


The exception message is inaccurate. It can also happen when the taskName 
is not in startingOffsets map (although I am not sure if such a case will 
happen).



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
131)


instead of getOrElse(null), try .orNull



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
143)


Should we a have different metric for number of messages received by 
process() than the number of messages actually processed?
We need to clarify the semantics of all our metrics, in perhaps a separate 
RB



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
216)


Looks good. 
nit: Can you change method to checkCaughtUp, instead of checkCatchedUp?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
(line 397)


Can you add some doc here saying this comparator is used in the context of 
broadcast streams (to detect impedence mismatch between tasks when consuming 
from broadcast stream) ?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 (line 109)


We are registering with the offset in the method invocation in Line 105. 
Why do we need to update the topicPartitionsAndOffsets map with the replaced 
offset ?

I understand that all tasks within the same container may be at different 
offset for broadcast stream ssps. But looks like consumer.register is being 
invoked in multiple places - TaskStorageManager & 
CoordinatorStreamSystemConsumer . Will the change impact these other components 
?



samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 (line 98)


nit: typo 'resiter'


- Navina Ramesh


On June 16, 2015, 9:23 p.m., Yan Fang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34974/
> ---
> 
> (Updated June 16, 2015, 9:23 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-676
> https://issues.apache.org/jira/browse/SAMZA-676
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> 1. added offsetComparator method in SystemAdmin Interface
> 
> 2. added "task.global.inputs" config
> 
> 3. rewrote Grouper classes using Java; allows to assign global streams during 
> grouping
> 
> 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
> to preserve messages order
> 
> 5. added taskNames to the offsets in OffsetManager
> 
> 6. allowed to assign one SSP to multiple taskInstances
> 
> 7. skipped already-processed messages in RunLoop
> 
> 8. unit tests for all changes
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 3374f0c 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySy

Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-06-16 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated June 16, 2015, 9:23 p.m.)


Review request for samza.


Changes
---

latest patch


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added "task.global.inputs" config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cbacd18 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
9fb1aa9 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
d9ae187 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-06-16 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated June 16, 2015, 9:16 p.m.)


Review request for samza.


Changes
---

rebase to the master (it was wrong in the previous upload)


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added "task.global.inputs" config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cbacd18 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
9fb1aa9 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
d9ae187 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-06-16 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/
---

(Updated June 16, 2015, 9:06 p.m.)


Review request for samza.


Changes
---

rebased to latest master


Bugs: SAMZA-676
https://issues.apache.org/jira/browse/SAMZA-676


Repository: samza


Description
---

1. added offsetComparator method in SystemAdmin Interface

2. added "task.global.inputs" config

3. rewrote Grouper classes using Java; allows to assign global streams during 
grouping

4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to 
preserve messages order

5. added taskNames to the offsets in OffsetManager

6. allowed to assign one SSP to multiple taskInstances

7. skipped already-processed messages in RunLoop

8. unit tests for all changes


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c 
  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
20e5d26 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cbacd18 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
 44e95fc 
  
samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
 3c0acad 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f410 
  samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c46 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
9fb1aa9 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 a14169b 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 74daf72 
  
samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 deb3895 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
d9ae187 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f5 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 de00320 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 1629035 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 2a84328 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b4 

Diff: https://reviews.apache.org/r/34974/diff/


Testing
---


Thanks,

Yan Fang