Re: Review Request 35606: SAMZA-716 One Link in Spark Streaming and Samza comparison page is broken

2015-06-18 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35606/#review88404
---

Ship it!


Ship It!

- Yan Fang


On June 18, 2015, 2:21 p.m., Aleksandar Bircakovic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35606/
 ---
 
 (Updated June 18, 2015, 2:21 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 The post is now pointing to the correct link.
 
 
 Diffs
 -
 
   docs/learn/documentation/versioned/comparisons/spark-streaming.md e1ccc3e 
 
 Diff: https://reviews.apache.org/r/35606/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Aleksandar Bircakovic
 




Samza hung after bootstrapping

2015-06-18 Thread Roger Hoover
I need some help.  I have a job which bootstraps one stream and then is
supposed to read from two.  When I run it on our YARN cluster with a single
container, it works correctly.  When I tried it with 5 containers, it gets
hung after consuming the bootstrap topic.  I ran it with the grid script on
my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one
container and still hangs after bootstrap.

Debug logs are here: http://pastebin.com/af3KPvju

I looked at JMX metrics and see:
- Task Metrics - no value for kafka offset of non-bootstrapped stream
-  SystemConsumerMetrics
- choose null keeps incrementing
 - ssps-needed-by-chooser 1
  - unprocessed-messages 62k
- Bootstrapping Chooser
  - lagging partitions 4
  - laggin-batch-streams - 4
  - batch-resets - 0

Has anyone seen this or can offer ideas of how to better debug it?

I'm using Samza 0.9.0 and YARN 2.4.0.

Thanks!

Roger


Re: Review Request 35397: Fix SAMZA-697

2015-06-18 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35397/
---

(Updated June 18, 2015, 6:42 p.m.)


Review request for samza.


Bugs: SAMZA-697
https://issues.apache.org/jira/browse/SAMZA-697


Repository: samza


Description (updated)
---

Address Yan's comments


Diffs (updated)
-

  checkstyle/import-control.xml 3374f0c432e61ac4cda275377604cfd481f0cddf 
  docs/learn/documentation/versioned/jobs/configuration-table.html 
405e2cea4fd1d037cc26b3537f6bb406eded202b 
  samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
0b3a235b5ab1d6bd60669bfe6023f6b0b4e943d3 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cbacd183420e9d1d72b05693b55a8f0a62d59fc5 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
c5a5ea5dea9a950fc741625238f5bf8b1f362180 
  samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
1c178a661e449c6bdfc4ce431aef9bb2d261a6c2 
  samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
4fac154709d72ab594485dad93c912b55fb1617e 
  samza-core/src/test/java/org/apache/samza/task/TestTaskClassLoader.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
9fb1aa98fcd14397e8a4cb00c67537482e95fa53 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
7caad28c9298485753ab861da76793cf925953ed 

Diff: https://reviews.apache.org/r/35397/diff/


Testing
---

unit tests


Thanks,

Guozhang Wang



Re: Measuring Samza Job Throughput

2015-06-18 Thread Tao Feng
Hi, Milinda, Yi,

Sure. I will be happy to help on this.

Thanks,
-Tao

On Wed, Jun 17, 2015 at 11:35 AM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Milinda,

 Tao @LinkedIn has done some Samza benchmark test using a standard
 word-count task. You may want to reach out to him for some detailed ideas
 on how to set up the perf tests.

 Best!

 -Yi

 On Wed, Jun 17, 2015 at 11:25 AM, Milinda Pathirage mpath...@umail.iu.edu
 
 wrote:

  Thank you all for the ideas. I'll have a look at KafkaSystem metrics and
  SamzaContainerMetrics.
 
  Milinda
 
  On Wed, Jun 17, 2015 at 2:38 AM, Tao Feng fengta...@gmail.com wrote:
 
   Hi,
  
   One metric I could think of related to Samza job throughput is the
   process-envelop metric listed in SamzaContainerMetrics. This counter
   get incremented whenever the container process meaningful message(
  
  
 
 https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
   
  
  
 
 https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
   ).
  
   But this metric is more like a QPS type of metric .
  
   Thanks,
   -Tao
  
   On Tue, Jun 16, 2015 at 9:11 PM, Milinda Pathirage 
  mpath...@umail.iu.edu
   wrote:
  
Hi Devs,
   
I was looking for a way to measure Samza job throughput and found
 that
   its
possible to do it via Samza's metrics reporter. But there several
 types
   of
metrics reported via this method. For example, TaskInstanceMetrics
   reports
number of messages sent. But if I wanted to get a measurement like
  bytes
per second produced, is there a way to do that. It looks
like KafkaSystemProducerMetrics and TaskInstanceMetrics only provide
   number
of messages sent.
   
If any of you have any experience in measuring Samza job throughput,
  can
you please share. Really appreciate any ideas on measuring job
   throughput.
   
Thanks
Milinda
--
Milinda Pathirage
   
PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University
   
twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org
   
  
 
 
 
  --
  Milinda Pathirage
 
  PhD Student | Research Assistant
  School of Informatics and Computing | Data to Insight Center
  Indiana University
 
  twitter: milindalakmal
  skype: milinda.pathirage
  blog: http://milinda.pathirage.org
 



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-06-18 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review88308
---


Mostly looks good. Have some questions:
* Have you tried the message filtering logic to the container level instead 
of the task level ? Not sure which is simpler in terms of code change. Since 
the container has access to all the task Instances and the systemAdmins, it 
seems convenient to have the caughtUp map within containerContext. I could be 
wrong :)
* I want to test the patch locally before confirming a ship it. Looks awesome 
for a first draft!


samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java (line 55)
https://reviews.apache.org/r/34974/#comment140753

This still does not handle the case of partition range. Please add the 
range handling or correct the exception message.



samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 (line 45)
https://reviews.apache.org/r/34974/#comment140785

nit: spacing



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
405)
https://reviews.apache.org/r/34974/#comment140947

The exception message is inaccurate. It can also happen when the taskName 
is not in startingOffsets map (although I am not sure if such a case will 
happen).



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
131)
https://reviews.apache.org/r/34974/#comment140958

instead of getOrElse(null), try .orNull



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
143)
https://reviews.apache.org/r/34974/#comment140959

Should we a have different metric for number of messages received by 
process() than the number of messages actually processed?
We need to clarify the semantics of all our metrics, in perhaps a separate 
RB



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
216)
https://reviews.apache.org/r/34974/#comment140960

Looks good. 
nit: Can you change method to checkCaughtUp, instead of checkCatchedUp?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
(line 397)
https://reviews.apache.org/r/34974/#comment140967

Can you add some doc here saying this comparator is used in the context of 
broadcast streams (to detect impedence mismatch between tasks when consuming 
from broadcast stream) ?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 (line 109)
https://reviews.apache.org/r/34974/#comment140972

We are registering with the offset in the method invocation in Line 105. 
Why do we need to update the topicPartitionsAndOffsets map with the replaced 
offset ?

I understand that all tasks within the same container may be at different 
offset for broadcast stream ssps. But looks like consumer.register is being 
invoked in multiple places - TaskStorageManager  
CoordinatorStreamSystemConsumer . Will the change impact these other components 
?



samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 (line 98)
https://reviews.apache.org/r/34974/#comment140973

nit: typo 'resiter'


- Navina Ramesh


On June 16, 2015, 9:23 p.m., Yan Fang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34974/
 ---
 
 (Updated June 16, 2015, 9:23 p.m.)
 
 
 Review request for samza.
 
 
 Bugs: SAMZA-676
 https://issues.apache.org/jira/browse/SAMZA-676
 
 
 Repository: samza
 
 
 Description
 ---
 
 1. added offsetComparator method in SystemAdmin Interface
 
 2. added task.global.inputs config
 
 3. rewrote Grouper classes using Java; allows to assign global streams during 
 grouping
 
 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
 to preserve messages order
 
 5. added taskNames to the offsets in OffsetManager
 
 6. allowed to assign one SSP to multiple taskInstances
 
 7. skipped already-processed messages in RunLoop
 
 8. unit tests for all changes
 
 
 Diffs
 -
 
   checkstyle/import-control.xml 3374f0c 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae 
   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
 PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
  PRE-CREATION 
   
 

Re: Review Request 35397: Fix SAMZA-697

2015-06-18 Thread Boris Shkolnik

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35397/#review88455
---


This is partial review (I didn't go thru the test).


samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 50)
https://reviews.apache.org/r/35397/#comment140992

Do we have test for this case?



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 63)
https://reviews.apache.org/r/35397/#comment140989

Do we need a LOG.warn in case the file doesn't exist.



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 72)
https://reviews.apache.org/r/35397/#comment140993

nit. may be if (blacklistClassnames == null) return false.



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 103)
https://reviews.apache.org/r/35397/#comment140995

should we have little bit more validation here (checking for empty strings 
for example)



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 109)
https://reviews.apache.org/r/35397/#comment140996

nit. should be checked at the beginning of the method.



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 129)
https://reviews.apache.org/r/35397/#comment140997

nit. 'blacklisted'



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 140)
https://reviews.apache.org/r/35397/#comment140998

else log an error?



samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 146)
https://reviews.apache.org/r/35397/#comment141000

do we need to override it if it is the same a default implementation.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
435)
https://reviews.apache.org/r/35397/#comment141001

Do we want to create this taskClassLoader if the taskClassLoaderPath is not 
configured? If this is the case we are creating classLoader with 'null' list of 
URLs. Is it safe?


This

- Boris Shkolnik


On June 18, 2015, 6:42 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/35397/
 ---
 
 (Updated June 18, 2015, 6:42 p.m.)
 
 
 Review request for samza.
 
 
 Bugs: SAMZA-697
 https://issues.apache.org/jira/browse/SAMZA-697
 
 
 Repository: samza
 
 
 Description
 ---
 
 Address Yan's comments
 
 
 Diffs
 -
 
   checkstyle/import-control.xml 3374f0c432e61ac4cda275377604cfd481f0cddf 
   docs/learn/documentation/versioned/jobs/configuration-table.html 
 405e2cea4fd1d037cc26b3537f6bb406eded202b 
   samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java 
 PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
 0b3a235b5ab1d6bd60669bfe6023f6b0b4e943d3 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 cbacd183420e9d1d72b05693b55a8f0a62d59fc5 
   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
 c5a5ea5dea9a950fc741625238f5bf8b1f362180 
   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
 1c178a661e449c6bdfc4ce431aef9bb2d261a6c2 
   
 samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
 4fac154709d72ab594485dad93c912b55fb1617e 
   samza-core/src/test/java/org/apache/samza/task/TestTaskClassLoader.java 
 PRE-CREATION 
   
 samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
 9fb1aa98fcd14397e8a4cb00c67537482e95fa53 
   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
 7caad28c9298485753ab861da76793cf925953ed 
 
 Diff: https://reviews.apache.org/r/35397/diff/
 
 
 Testing
 ---
 
 unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Review Request 35598: SAMZA-563 Upgrade Hello - Samza to YARN 2.6.0

2015-06-18 Thread Aleksandar Pejakovic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35598/
---

Review request for samza.


Repository: samza-hello-samza


Description
---

Changed bin/grid script and pom.xml file in samza-hello-samza to use hadoop 
2.6.0 instead of 2.4.0.


Diffs
-

  bin/grid a639ade 
  pom.xml 0e3bf5f 

Diff: https://reviews.apache.org/r/35598/diff/


Testing
---


Thanks,

Aleksandar Pejakovic