Re: Review Request 36903: SAMZA-744: shutdown stores before shutdown producers
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36903/#review94193 --- samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala (lines 47 - 49) https://reviews.apache.org/r/36903/#comment148730 we need to remove the author information. :) And maybe add some java doc instead. My 2 cents: 1. If this is a real test, to be consistent, we may want to use TestStreamTask (begin with Test), or change all other TestSomething to SomethingTest (e.g. change TestStateful to StatefulTest) 2. If this is not a real test, I prefer something like StreamTaskUtil to be less ambiguous. samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala (line 96) https://reviews.apache.org/r/36903/#comment148740 is this tag used? samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala (line 148) https://reviews.apache.org/r/36903/#comment148741 same, is this used? samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala (line 169) https://reviews.apache.org/r/36903/#comment148742 There is no TestJob. (I know, it is copy/paste issue :) samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala (line 176) https://reviews.apache.org/r/36903/#comment148752 why TestStateStoreTask here? I think you mean TestTask.awaitTaskReistered samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala (line 64) https://reviews.apache.org/r/36903/#comment148745 From the description, it is not testing the Container Shutdown, actually it is testing the store restoring feature. samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala (line 66) https://reviews.apache.org/r/36903/#comment148734 Since we already are doing the abstraction, is it possible to put the common config into StreamTastTest object? Becaue I see a lot of the same configs in ShutdownContainerTest and TestStatefulTask. samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala (lines 87 - 89) https://reviews.apache.org/r/36903/#comment148755 in the 0.10.0, we do not have checkpoint factory, I believe samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala (lines 142 - 146) https://reviews.apache.org/r/36903/#comment148754 are those two methods used anywhere? samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala (line 155) https://reviews.apache.org/r/36903/#comment148758 how about adding override ? samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala (line 165) https://reviews.apache.org/r/36903/#comment148759 how about adding override? samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala (lines 88 - 91) https://reviews.apache.org/r/36903/#comment148756 same samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala (line 95) https://reviews.apache.org/r/36903/#comment148757 actually i do not understand why we need a companion object here. We just use the default task number, 1. And awaitTaskRegistered and register methods are not used anywhere. samza-test/src/test/scala/org/apache/samza/test/integration/TestTask.scala (lines 32 - 34) https://reviews.apache.org/r/36903/#comment148731 Instead of the author information, I think putting some java doc explaining this class/object will be better. samza-test/src/test/scala/org/apache/samza/test/integration/TestTask.scala (line 37) https://reviews.apache.org/r/36903/#comment148749 rm ; - Yan Fang On Aug. 4, 2015, 9:30 p.m., Yi Pan (Data Infrastructure) wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36903/ --- (Updated Aug. 4, 2015, 9:30 p.m.) Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Navina Ramesh. Bugs: SAMZA-744 https://issues.apache.org/jira/browse/SAMZA-744 Repository: samza Description --- SAMZA-744: shutdown stores before shutdown producers Diffs - build.gradle 0852adc4e8e0c2816afd1ebf433f1af6b44852f7 samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 27b2517048ad5730762506426ee7578c66181db8 samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala PRE-CREATION samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala PRE-CREATION
Missing a change log offset for SystemStreamPartition
Hi, I am trying to use the Keystore to manage some state information. Basically this is the code I am using. As long as I have tested, the rest is working correctly. private KeyValueStoreString, String storestp; public void init(Config config, TaskContext context) { this.storestp = (KeyValueStoreString, String) context.getStore(stepdb); } public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { … String str = storestp.get(code) … } When I load it, it goes to running but, whe I send the messages through Kafka stream It goes to Failed state. I have found this Exception: Exception in thread main org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2]. at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) I have seen that the stepdb-changelog stream exists in Kafka. As a try to regenerate the missing offset and tes it I have connected through the command line and send a message to the stream. It was received correctly. Now I am seeing the following Exception: Exception in thread main java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126) at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126) at scala.collection.SeqLike$class.size(SeqLike.scala:106) at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120) at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94) at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:79) at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:112) at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:106) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at
Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code
On July 27, 2015, 7:36 a.m., Yi Pan (Data Infrastructure) wrote: The code LGTM. For testing, if we can verify this fix w/ a stateful StreamTask w/ changelog enabled with some partition numbers that are different from the default auto-creation partition number (i.e. 8) in Kafka, it would be good. The integration test suite in samza-test should be a good place to add the test there. Try following the steps in samza-test/src/main/config/join/README and run the integration test. The joiner task has a changelog configured with partition number of 2. You can verify the test passed w/ your fix. Hi Yi, sorry for bothering you so much with this task : ) I'll just write down what I managed to do regarding integration tests: 1. I ran integration tests via Zopkio and they all finished successfully. 2. I ran the integration per guide in samza-test/src/main/config/join/README and I suspect they ran successfully, since none of them had an abnormal final status. I also ran the failure tests (albeit after some limited fidling with the python scripts involved). 3. I ran ./gradlew clean build (which runs TestStatefulTask). It finished with a STANDARD_ERROR, which I assume is a good thing, but here is the output, just in case: http://pastebin.com/aLT5jRdd What I suspect are the next (possible) steps: 1. Create integration tests to be used with Zopkio. Here I am uncertain how I would kill/stop Samza task to verify that changelog stream is being consumed properly. 2. Create another set of tasks similar to Checker/Emitter/Joiner/Watcher. I believe this is unnecessary since they have their changelogs and their restartability is being tested. Of course, I might be wrong. 3. Add another test similar to TestStatefulTask. a. Or add num.partitions param to TestStatefulTask. 4. None of the above : ) Again, I am very sorry for relying on you this much, but I'm really unclear on how to proceed regarding this. - Robert --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36163/#review93093 --- On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36163/ --- (Updated July 9, 2015, 2:39 p.m.) Review request for samza. Repository: samza Description --- Removed trailing whitespaces Diffs - samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d Diff: https://reviews.apache.org/r/36163/diff/ Testing --- I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available. Thanks, Robert Zuljevic
Re: Review Request 37069: SAMZA-738 Samza Timer based metrics does not have enough precision
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/37069/ --- (Updated Aug. 5, 2015, 2:58 p.m.) Review request for samza. Changes --- Fixed test Repository: samza Description --- Changed SystemProducersMetrics and RunLoop so that metrics now show nanoseconds instead milliseconds. Diffs (updated) - samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala aa7a9bc samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala 1643070 samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 64a5844 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala 39c54aa samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala 8aa73ce Diff: https://reviews.apache.org/r/37069/diff/ Testing --- Tested on hello-samza - wikipedia-parser, results: ``` org.apache.samza.container.SamzaContainerMetrics:{ commit-calls:10, window-ns:3198.62544796632, process-null-envelopes:56292, process-envelopes:989, window-calls:0, commit-ns:5130.901534393375, send-calls:0, process-calls:57283, choose-ns:10368839.818551894, process-ns:10390588.194071393, event-loop-utilization:0.99807554 } ``` Thanks, Aleksandar Pejakovic
Re: How should Samza be run on AWS?
I don't know of any tutorials, but the order to tackle things would be: 1) Set up ZK- this could be a single node install for a PoC or a 3 or 5 node install for production. m3.medium is a reasonable node type. 2) Set up Kafka- could be a single instance without replication for a PoC. For production, as many as you need, and you'd probably want replication. I think if you want to use local instance storage, i2 instances are good, and if you want to use EBS, probably m3 instances. 3) Set up YARN- this could be a single instance (running pseudo-distributed with master slave on the same machine) or two instances (one master, one slave) for a PoC. I think c3 or r3 instance types are good for the slaves, depending on how much memory you need. Workloads without large amounts of state should be ok on c3 instances. EMR might actually work for YARN if you use the long-running kind of cluster (see: http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-plan-longrunning-transient.html). I haven't tried that, but it might be worth a shot before going for stock apache hadoop. On Tue, Aug 4, 2015 at 5:58 PM, Job-Selina Wu swucaree...@gmail.com wrote: Dear All:I was looking for the tutorial how to build and run Samza on AWS and then I found a link below. I am wondering if there is a detail tutorial about how to build Samza on AWS? Sincerely, Selina https://cwiki.apache.org/confluence/display/SAMZA/FAQ#FAQ-HowshouldSamzaberunonAWS ? How should Samza be run on AWS? From Gian Merlino: - We've been using Samza in production on AWS for a little over a month. We're just using the YARN runner on a mostly stock hadoop 2.4.0 cluster (not EMR). Our experience is that c3s work well for the YARN instances and i2s work well for the Kafka instances. Things have been pretty solid with that setup. For scaling up and scaling down YARN, we just terminate instances or add instances, and this works pretty well. It can take a few minutes for the cluster to realize a node has gone and respawn containers elsewhere. We have a separate Kafka cluster just for Samza's use, different from our main Kafka cluster. The main reason is that we wanted to isolate off the disk and network load of state compactions and restores (we don't use compacted topics in our main Kafka cluster, but we do use them with Samza, and the extra load on Kafka can be substantial).
Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code
On July 27, 2015, 7:36 a.m., Yi Pan (Data Infrastructure) wrote: The code LGTM. For testing, if we can verify this fix w/ a stateful StreamTask w/ changelog enabled with some partition numbers that are different from the default auto-creation partition number (i.e. 8) in Kafka, it would be good. The integration test suite in samza-test should be a good place to add the test there. Try following the steps in samza-test/src/main/config/join/README and run the integration test. The joiner task has a changelog configured with partition number of 2. You can verify the test passed w/ your fix. Robert Zuljevic wrote: Hi Yi, sorry for bothering you so much with this task : ) I'll just write down what I managed to do regarding integration tests: 1. I ran integration tests via Zopkio and they all finished successfully. 2. I ran the integration per guide in samza-test/src/main/config/join/README and I suspect they ran successfully, since none of them had an abnormal final status. I also ran the failure tests (albeit after some limited fidling with the python scripts involved). 3. I ran ./gradlew clean build (which runs TestStatefulTask). It finished with a STANDARD_ERROR, which I assume is a good thing, but here is the output, just in case: http://pastebin.com/aLT5jRdd What I suspect are the next (possible) steps: 1. Create integration tests to be used with Zopkio. Here I am uncertain how I would kill/stop Samza task to verify that changelog stream is being consumed properly. 2. Create another set of tasks similar to Checker/Emitter/Joiner/Watcher. I believe this is unnecessary since they have their changelogs and their restartability is being tested. Of course, I might be wrong. 3. Add another test similar to TestStatefulTask. a. Or add num.partitions param to TestStatefulTask. 4. None of the above : ) Again, I am very sorry for relying on you this much, but I'm really unclear on how to proceed regarding this. Hi, @Robert, sorry that I was not too specific in the comment before. If you have successfully ran the integration tests via the steps in samza-test/src/main/config/join/README. It should be good to go. Thanks! - Yi --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36163/#review93093 --- On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36163/ --- (Updated July 9, 2015, 2:39 p.m.) Review request for samza. Repository: samza Description --- Removed trailing whitespaces Diffs - samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d Diff: https://reviews.apache.org/r/36163/diff/ Testing --- I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available. Thanks, Robert Zuljevic
Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34500/#review94271 --- I went through old discussions and also went through Calcite's RelBuilder (https://github.com/milinda/incubator-calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java) to look at our TopologyBuilder from SQL query plan perspective. Below are my thoughts. * I agree with Guozhang that we should first focus on simple use cases and I think we should not try to integrate support for building complex DAGs which contains multiple complex queries via this builder API. * IMHO, TopologyBuilder is closer to query execution than to the query. And if we need people to compose SQL queries through a Java API, its better to have an API similar to jOOQ (http://www.jooq.org) for streaming SQL. * AFAIK, **split** mentioned in one of Yi's comment doesn't occurs in SQL query plans because SQL operators always has one output (@Yi please correct me if I am wrong). * IMHO, supporting something similar to views through the builder API may be useful. We can allow to refer the result from builder (may be not through *build* method but via method like *buildView*) method as inputs to the other queries to facilitate this . So I'm proposing builder similar to following based on Calcite's RelBuilder API: ```java TopologyBuilder builder = TopologyBuilder.create(..); OperatorRouter router = builder.scan(stream1) .window(10, 2) .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...) .scan(stream2) .window(10, 2) .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...) .join(JoinType.INNER, builder.condition(...)) .scan(stream2) .project(..) .window(10, 2) .join(joinType, condition) .partition(partionKey, number) .modify(Operation.INSERT, ..) ``` * In above mentioned API, *beginStream* is renamed to *scan* to take to API closer to physical plan. * *scan* in the middle means a start of a new input or input sub-query * *join* takes last two sub-trees (sub-queries) as inputs * *modify* is used to insert/update tuples to streams or tables * Builder will provide utility methods to create conditions, function calls, aggregates and ```GROUP BY``` clauses. * Above assumes that there is no multi-output operators. * Reusable sub-queries are not present in the above example, I'll think about it and introduce a mechanism to re-use sub-queries (Possibly introducing the view concept) Please feel free to comment on this. - Milinda Pathirage On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34500/ --- (Updated May 20, 2015, 11:13 p.m.) Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram. Bugs: SAMZA-552 https://issues.apache.org/jira/browse/SAMZA-552 Repository: samza Description --- SAMZA-552: added operator builder API - The current operator builder only supports single output DAG topology yet Diffs - samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION
Re: How should Samza be run on AWS?
I wrote several Ansible playbooks to deploy YARN (without HDFS), Zookeeper and Kafka to EC2 for deploying Samza jobs. If you know ansible those scripts may be helpful. You can find them at https://github.iu.edu/mpathira/samza-ec2-ansible. I was planning to add document describing these scripts but could do it yet. I looked at EMR also, but as I remember EMR job deployment model doesn't work with current scripts provided by Samza. I used R3 instances for Kafka and C3 instances for YARN. As I remember I could get close to 1million msg/s with 3 node Kafka cluster running on r3.xlarge instance and 2 (or 4) node YARN cluster running 4 stream tasks per job. Thanks Milinda On Wed, Aug 5, 2015 at 11:27 AM, Gian Merlino gianmerl...@gmail.com wrote: I don't know of any tutorials, but the order to tackle things would be: 1) Set up ZK- this could be a single node install for a PoC or a 3 or 5 node install for production. m3.medium is a reasonable node type. 2) Set up Kafka- could be a single instance without replication for a PoC. For production, as many as you need, and you'd probably want replication. I think if you want to use local instance storage, i2 instances are good, and if you want to use EBS, probably m3 instances. 3) Set up YARN- this could be a single instance (running pseudo-distributed with master slave on the same machine) or two instances (one master, one slave) for a PoC. I think c3 or r3 instance types are good for the slaves, depending on how much memory you need. Workloads without large amounts of state should be ok on c3 instances. EMR might actually work for YARN if you use the long-running kind of cluster (see: http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-plan-longrunning-transient.html ). I haven't tried that, but it might be worth a shot before going for stock apache hadoop. On Tue, Aug 4, 2015 at 5:58 PM, Job-Selina Wu swucaree...@gmail.com wrote: Dear All:I was looking for the tutorial how to build and run Samza on AWS and then I found a link below. I am wondering if there is a detail tutorial about how to build Samza on AWS? Sincerely, Selina https://cwiki.apache.org/confluence/display/SAMZA/FAQ#FAQ-HowshouldSamzaberunonAWS ? How should Samza be run on AWS? From Gian Merlino: - We've been using Samza in production on AWS for a little over a month. We're just using the YARN runner on a mostly stock hadoop 2.4.0 cluster (not EMR). Our experience is that c3s work well for the YARN instances and i2s work well for the Kafka instances. Things have been pretty solid with that setup. For scaling up and scaling down YARN, we just terminate instances or add instances, and this works pretty well. It can take a few minutes for the cluster to realize a node has gone and respawn containers elsewhere. We have a separate Kafka cluster just for Samza's use, different from our main Kafka cluster. The main reason is that we wanted to isolate off the disk and network load of state compactions and restores (we don't use compacted topics in our main Kafka cluster, but we do use them with Samza, and the extra load on Kafka can be substantial). -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
Re: Missing a change log offset for SystemStreamPartition
Hi Jordi, I wonder, the reason of your first exception is that, you changed the task number (partition number of your input stream), but still were using the same changelog stream. It is trying to send to the partition 2, which does not exist? Can you reproduce this exception in a new job? (new store name, new job name) The second exception is caused by the wrong offset format, I believe. Let me know how the new job goes. Thanks, Fang, Yan yanfang...@gmail.com On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri jbl...@nextel.es wrote: Hi, I am trying to use the Keystore to manage some state information. Basically this is the code I am using. As long as I have tested, the rest is working correctly. private KeyValueStoreString, String storestp; public void init(Config config, TaskContext context) { this.storestp = (KeyValueStoreString, String) context.getStore(stepdb); } public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { … String str = storestp.get(code) … } When I load it, it goes to running but, whe I send the messages through Kafka stream It goes to Failed state. I have found this Exception: Exception in thread main org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2]. at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) I have seen that the stepdb-changelog stream exists in Kafka. As a try to regenerate the missing offset and tes it I have connected through the command line and send a message to the stream. It was received correctly. Now I am seeing the following Exception: Exception in thread main java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126) at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126) at scala.collection.SeqLike$class.size(SeqLike.scala:106) at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120) at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94) at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at
Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes
On Aug. 5, 2015, 5:34 p.m., Milinda Pathirage wrote: I went through old discussions and also went through Calcite's RelBuilder (https://github.com/milinda/incubator-calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java) to look at our TopologyBuilder from SQL query plan perspective. Below are my thoughts. * I agree with Guozhang that we should first focus on simple use cases and I think we should not try to integrate support for building complex DAGs which contains multiple complex queries via this builder API. * IMHO, TopologyBuilder is closer to query execution than to the query. And if we need people to compose SQL queries through a Java API, its better to have an API similar to jOOQ (http://www.jooq.org) for streaming SQL. * AFAIK, **split** mentioned in one of Yi's comment doesn't occurs in SQL query plans because SQL operators always has one output (@Yi please correct me if I am wrong). * IMHO, supporting something similar to views through the builder API may be useful. We can allow to refer the result from builder (may be not through *build* method but via method like *buildView*) method as inputs to the other queries to facilitate this . So I'm proposing builder similar to following based on Calcite's RelBuilder API: ```java TopologyBuilder builder = TopologyBuilder.create(..); OperatorRouter router = builder.scan(stream1) .window(10, 2) .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...) .scan(stream2) .window(10, 2) .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...) .join(JoinType.INNER, builder.condition(...)) .scan(stream2) .project(..) .window(10, 2) .join(joinType, condition) .partition(partionKey, number) .modify(Operation.INSERT, ..) ``` * In above mentioned API, *beginStream* is renamed to *scan* to take to API closer to physical plan. * *scan* in the middle means a start of a new input or input sub-query * *join* takes last two sub-trees (sub-queries) as inputs * *modify* is used to insert/update tuples to streams or tables * Builder will provide utility methods to create conditions, function calls, aggregates and ```GROUP BY``` clauses. * Above assumes that there is no multi-output operators. * Reusable sub-queries are not present in the above example, I'll think about it and introduce a mechanism to re-use sub-queries (Possibly introducing the view concept) Please feel free to comment on this. Instead of group keys, aggregate calls or conditions we can directly take OperatorSpec instances, given that OperatorSpecs already encapsulate all the things necessary for an operator. - Milinda --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34500/#review94271 --- On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34500/ --- (Updated May 20, 2015, 11:13 p.m.) Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram. Bugs: SAMZA-552 https://issues.apache.org/jira/browse/SAMZA-552 Repository: samza Description --- SAMZA-552: added operator builder API - The current operator builder only supports single output DAG topology yet Diffs - samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION