Re: Review Request 35606: SAMZA-716 One Link in Spark Streaming and Samza comparison page is broken
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35606/#review88404 --- Ship it! Ship It! - Yan Fang On June 18, 2015, 2:21 p.m., Aleksandar Bircakovic wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35606/ --- (Updated June 18, 2015, 2:21 p.m.) Review request for samza. Repository: samza Description --- The post is now pointing to the correct link. Diffs - docs/learn/documentation/versioned/comparisons/spark-streaming.md e1ccc3e Diff: https://reviews.apache.org/r/35606/diff/ Testing --- Thanks, Aleksandar Bircakovic
Samza hung after bootstrapping
I need some help. I have a job which bootstraps one stream and then is supposed to read from two. When I run it on our YARN cluster with a single container, it works correctly. When I tried it with 5 containers, it gets hung after consuming the bootstrap topic. I ran it with the grid script on my laptop (Mac OS X) with yarn.container.count=2 but it only spawns one container and still hangs after bootstrap. Debug logs are here: http://pastebin.com/af3KPvju I looked at JMX metrics and see: - Task Metrics - no value for kafka offset of non-bootstrapped stream - SystemConsumerMetrics - choose null keeps incrementing - ssps-needed-by-chooser 1 - unprocessed-messages 62k - Bootstrapping Chooser - lagging partitions 4 - laggin-batch-streams - 4 - batch-resets - 0 Has anyone seen this or can offer ideas of how to better debug it? I'm using Samza 0.9.0 and YARN 2.4.0. Thanks! Roger
Re: Review Request 35397: Fix SAMZA-697
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35397/ --- (Updated June 18, 2015, 6:42 p.m.) Review request for samza. Bugs: SAMZA-697 https://issues.apache.org/jira/browse/SAMZA-697 Repository: samza Description (updated) --- Address Yan's comments Diffs (updated) - checkstyle/import-control.xml 3374f0c432e61ac4cda275377604cfd481f0cddf docs/learn/documentation/versioned/jobs/configuration-table.html 405e2cea4fd1d037cc26b3537f6bb406eded202b samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java PRE-CREATION samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 0b3a235b5ab1d6bd60669bfe6023f6b0b4e943d3 samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala cbacd183420e9d1d72b05693b55a8f0a62d59fc5 samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala c5a5ea5dea9a950fc741625238f5bf8b1f362180 samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 1c178a661e449c6bdfc4ce431aef9bb2d261a6c2 samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 4fac154709d72ab594485dad93c912b55fb1617e samza-core/src/test/java/org/apache/samza/task/TestTaskClassLoader.java PRE-CREATION samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 9fb1aa98fcd14397e8a4cb00c67537482e95fa53 samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 7caad28c9298485753ab861da76793cf925953ed Diff: https://reviews.apache.org/r/35397/diff/ Testing --- unit tests Thanks, Guozhang Wang
Re: Measuring Samza Job Throughput
Hi, Milinda, Yi, Sure. I will be happy to help on this. Thanks, -Tao On Wed, Jun 17, 2015 at 11:35 AM, Yi Pan nickpa...@gmail.com wrote: Hi, Milinda, Tao @LinkedIn has done some Samza benchmark test using a standard word-count task. You may want to reach out to him for some detailed ideas on how to set up the perf tests. Best! -Yi On Wed, Jun 17, 2015 at 11:25 AM, Milinda Pathirage mpath...@umail.iu.edu wrote: Thank you all for the ideas. I'll have a look at KafkaSystem metrics and SamzaContainerMetrics. Milinda On Wed, Jun 17, 2015 at 2:38 AM, Tao Feng fengta...@gmail.com wrote: Hi, One metric I could think of related to Samza job throughput is the process-envelop metric listed in SamzaContainerMetrics. This counter get incremented whenever the container process meaningful message( https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ). But this metric is more like a QPS type of metric . Thanks, -Tao On Tue, Jun 16, 2015 at 9:11 PM, Milinda Pathirage mpath...@umail.iu.edu wrote: Hi Devs, I was looking for a way to measure Samza job throughput and found that its possible to do it via Samza's metrics reporter. But there several types of metrics reported via this method. For example, TaskInstanceMetrics reports number of messages sent. But if I wanted to get a measurement like bytes per second produced, is there a way to do that. It looks like KafkaSystemProducerMetrics and TaskInstanceMetrics only provide number of messages sent. If any of you have any experience in measuring Samza job throughput, can you please share. Really appreciate any ideas on measuring job throughput. Thanks Milinda -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
Re: Review Request 34974: SAMZA-676: implement broadcast stream
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34974/#review88308 --- Mostly looks good. Have some questions: * Have you tried the message filtering logic to the container level instead of the task level ? Not sure which is simpler in terms of code change. Since the container has access to all the task Instances and the systemAdmins, it seems convenient to have the caughtUp map within containerContext. I could be wrong :) * I want to test the patch locally before confirming a ship it. Looks awesome for a first draft! samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java (line 55) https://reviews.apache.org/r/34974/#comment140753 This still does not handle the case of partition range. Please add the range handling or correct the exception message. samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java (line 45) https://reviews.apache.org/r/34974/#comment140785 nit: spacing samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 405) https://reviews.apache.org/r/34974/#comment140947 The exception message is inaccurate. It can also happen when the taskName is not in startingOffsets map (although I am not sure if such a case will happen). samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 131) https://reviews.apache.org/r/34974/#comment140958 instead of getOrElse(null), try .orNull samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 143) https://reviews.apache.org/r/34974/#comment140959 Should we a have different metric for number of messages received by process() than the number of messages actually processed? We need to clarify the semantics of all our metrics, in perhaps a separate RB samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 216) https://reviews.apache.org/r/34974/#comment140960 Looks good. nit: Can you change method to checkCaughtUp, instead of checkCatchedUp? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala (line 397) https://reviews.apache.org/r/34974/#comment140967 Can you add some doc here saying this comparator is used in the context of broadcast streams (to detect impedence mismatch between tasks when consuming from broadcast stream) ? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala (line 109) https://reviews.apache.org/r/34974/#comment140972 We are registering with the offset in the method invocation in Line 105. Why do we need to update the topicPartitionsAndOffsets map with the replaced offset ? I understand that all tasks within the same container may be at different offset for broadcast stream ssps. But looks like consumer.register is being invoked in multiple places - TaskStorageManager CoordinatorStreamSystemConsumer . Will the change impact these other components ? samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala (line 98) https://reviews.apache.org/r/34974/#comment140973 nit: typo 'resiter' - Navina Ramesh On June 16, 2015, 9:23 p.m., Yan Fang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34974/ --- (Updated June 16, 2015, 9:23 p.m.) Review request for samza. Bugs: SAMZA-676 https://issues.apache.org/jira/browse/SAMZA-676 Repository: samza Description --- 1. added offsetComparator method in SystemAdmin Interface 2. added task.global.inputs config 3. rewrote Grouper classes using Java; allows to assign global streams during grouping 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to preserve messages order 5. added taskNames to the offsets in OffsetManager 6. allowed to assign one SSP to multiple taskInstances 7. skipped already-processed messages in RunLoop 8. unit tests for all changes Diffs - checkstyle/import-control.xml 3374f0c samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java PRE-CREATION
Re: Review Request 35397: Fix SAMZA-697
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35397/#review88455 --- This is partial review (I didn't go thru the test). samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 50) https://reviews.apache.org/r/35397/#comment140992 Do we have test for this case? samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 63) https://reviews.apache.org/r/35397/#comment140989 Do we need a LOG.warn in case the file doesn't exist. samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 72) https://reviews.apache.org/r/35397/#comment140993 nit. may be if (blacklistClassnames == null) return false. samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 103) https://reviews.apache.org/r/35397/#comment140995 should we have little bit more validation here (checking for empty strings for example) samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 109) https://reviews.apache.org/r/35397/#comment140996 nit. should be checked at the beginning of the method. samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 129) https://reviews.apache.org/r/35397/#comment140997 nit. 'blacklisted' samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 140) https://reviews.apache.org/r/35397/#comment140998 else log an error? samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java (line 146) https://reviews.apache.org/r/35397/#comment141000 do we need to override it if it is the same a default implementation. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 435) https://reviews.apache.org/r/35397/#comment141001 Do we want to create this taskClassLoader if the taskClassLoaderPath is not configured? If this is the case we are creating classLoader with 'null' list of URLs. Is it safe? This - Boris Shkolnik On June 18, 2015, 6:42 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35397/ --- (Updated June 18, 2015, 6:42 p.m.) Review request for samza. Bugs: SAMZA-697 https://issues.apache.org/jira/browse/SAMZA-697 Repository: samza Description --- Address Yan's comments Diffs - checkstyle/import-control.xml 3374f0c432e61ac4cda275377604cfd481f0cddf docs/learn/documentation/versioned/jobs/configuration-table.html 405e2cea4fd1d037cc26b3537f6bb406eded202b samza-core/src/main/java/org/apache/samza/task/TaskClassLoader.java PRE-CREATION samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 0b3a235b5ab1d6bd60669bfe6023f6b0b4e943d3 samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala cbacd183420e9d1d72b05693b55a8f0a62d59fc5 samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala c5a5ea5dea9a950fc741625238f5bf8b1f362180 samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 1c178a661e449c6bdfc4ce431aef9bb2d261a6c2 samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 4fac154709d72ab594485dad93c912b55fb1617e samza-core/src/test/java/org/apache/samza/task/TestTaskClassLoader.java PRE-CREATION samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 9fb1aa98fcd14397e8a4cb00c67537482e95fa53 samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 7caad28c9298485753ab861da76793cf925953ed Diff: https://reviews.apache.org/r/35397/diff/ Testing --- unit tests Thanks, Guozhang Wang
Review Request 35598: SAMZA-563 Upgrade Hello - Samza to YARN 2.6.0
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35598/ --- Review request for samza. Repository: samza-hello-samza Description --- Changed bin/grid script and pom.xml file in samza-hello-samza to use hadoop 2.6.0 instead of 2.4.0. Diffs - bin/grid a639ade pom.xml 0e3bf5f Diff: https://reviews.apache.org/r/35598/diff/ Testing --- Thanks, Aleksandar Pejakovic