Re: Review Request 36903: SAMZA-744: shutdown stores before shutdown producers

2015-08-05 Thread Yan Fang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36903/#review94193
---



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (lines 47 - 49)
https://reviews.apache.org/r/36903/#comment148730

we need to remove the author information. :) And maybe add some java doc 
instead.

My 2 cents:
1. If this is a real test, to be consistent, we may want to use 
TestStreamTask (begin with Test), or change all other TestSomething to 
SomethingTest (e.g. change TestStateful to StatefulTest)

2. If this is not a real test, I prefer something like StreamTaskUtil to be 
less ambiguous.



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 96)
https://reviews.apache.org/r/36903/#comment148740

is this tag used?



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 148)
https://reviews.apache.org/r/36903/#comment148741

same, is this used?



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 169)
https://reviews.apache.org/r/36903/#comment148742

There is no TestJob. (I know, it is copy/paste issue :)



samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
 (line 176)
https://reviews.apache.org/r/36903/#comment148752

why TestStateStoreTask here? I think you mean TestTask.awaitTaskReistered



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 64)
https://reviews.apache.org/r/36903/#comment148745

From the description, it is not testing the Container Shutdown, actually it 
is testing the store restoring feature.



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 66)
https://reviews.apache.org/r/36903/#comment148734

Since we already are doing the abstraction, is it possible to put the 
common config into StreamTastTest object? Becaue I see a lot of the same 
configs in ShutdownContainerTest and TestStatefulTask.



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (lines 87 - 89)
https://reviews.apache.org/r/36903/#comment148755

in the 0.10.0, we do not have checkpoint factory, I believe



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (lines 142 - 146)
https://reviews.apache.org/r/36903/#comment148754

are those two methods used anywhere?



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 155)
https://reviews.apache.org/r/36903/#comment148758

how about adding override ?



samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
 (line 165)
https://reviews.apache.org/r/36903/#comment148759

how about adding override?



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 (lines 88 - 91)
https://reviews.apache.org/r/36903/#comment148756

same



samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 (line 95)
https://reviews.apache.org/r/36903/#comment148757

actually i do not understand why we need a companion object here. We just 
use the default task number, 1.

And awaitTaskRegistered and register methods are not used anywhere.



samza-test/src/test/scala/org/apache/samza/test/integration/TestTask.scala 
(lines 32 - 34)
https://reviews.apache.org/r/36903/#comment148731

Instead of the author information, I think putting some java doc explaining 
this class/object will be better.



samza-test/src/test/scala/org/apache/samza/test/integration/TestTask.scala 
(line 37)
https://reviews.apache.org/r/36903/#comment148749

rm ;


- Yan Fang


On Aug. 4, 2015, 9:30 p.m., Yi Pan (Data Infrastructure) wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36903/
 ---
 
 (Updated Aug. 4, 2015, 9:30 p.m.)
 
 
 Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and 
 Navina Ramesh.
 
 
 Bugs: SAMZA-744
 https://issues.apache.org/jira/browse/SAMZA-744
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-744: shutdown stores before shutdown producers
 
 
 Diffs
 -
 
   build.gradle 0852adc4e8e0c2816afd1ebf433f1af6b44852f7 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 27b2517048ad5730762506426ee7578c66181db8 
   
 samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTest.scala
  PRE-CREATION 
   
 samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownContainer.scala
  PRE-CREATION 
   
 

Missing a change log offset for SystemStreamPartition

2015-08-05 Thread Jordi Blasi Uribarri
Hi,

I am trying to use the Keystore to manage some state information. Basically 
this is the code I am using. As long as I have tested, the rest is working 
correctly.

private KeyValueStoreString, String storestp;

public void init(Config config, TaskContext context) {
 this.storestp = (KeyValueStoreString, String) 
context.getStore(stepdb);
   }

   public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator)
{
   …
String str = storestp.get(code)
…
}

When I load it, it goes to running but, whe I send the messages through Kafka 
stream It goes to Failed state. I have found this Exception:
Exception in thread main org.apache.samza.SamzaException: Missing a change 
log offset for SystemStreamPartition [kafka, stepdb-changelog, 2].
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at 
org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
at 
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
at 
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at 
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

I have seen that the stepdb-changelog stream exists in Kafka. As a try to 
regenerate the missing offset and tes it I have connected through the command 
line and send a message to the stream. It was received correctly. Now I am 
seeing the following Exception:

Exception in thread main java.lang.NullPointerException
at 
scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
at scala.collection.SeqLike$class.size(SeqLike.scala:106)
at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
at 
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94)
at 
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:79)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:112)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:106)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at 

Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-08-05 Thread Robert Zuljevic


 On July 27, 2015, 7:36 a.m., Yi Pan (Data Infrastructure) wrote:
  The code LGTM. For testing, if we can verify this fix w/ a stateful 
  StreamTask w/ changelog enabled with some partition numbers that are 
  different from the default auto-creation partition number (i.e. 8) in 
  Kafka, it would be good. The integration test suite in samza-test should be 
  a good place to add the test there. Try following the steps in 
  samza-test/src/main/config/join/README and run the integration test. The 
  joiner task has a changelog configured with partition number of 2. You can 
  verify the test passed w/ your fix.

Hi Yi, sorry for bothering you so much with this task : ) I'll just write down 
what I managed to do regarding integration tests:

1. I ran integration tests via Zopkio and they all finished successfully.
2. I ran the integration per guide in samza-test/src/main/config/join/README 
and I suspect they ran successfully, since none of them had an abnormal final 
status. I also ran the failure tests (albeit after some limited fidling with 
the python scripts involved).
3. I ran ./gradlew clean build (which runs TestStatefulTask). It finished 
with a STANDARD_ERROR, which I assume is a good thing, but here is the output, 
just in case: http://pastebin.com/aLT5jRdd

What I suspect are the next (possible) steps:

1. Create integration tests to be used with Zopkio. Here I am uncertain how I 
would kill/stop Samza task to verify that changelog stream is being consumed 
properly.
2. Create another set of tasks similar to Checker/Emitter/Joiner/Watcher. I 
believe this is unnecessary since they have their changelogs and their 
restartability is being tested. Of course, I might be wrong.
3. Add another test similar to TestStatefulTask.
   a. Or add num.partitions param to TestStatefulTask.
4. None of the above : )
   
Again, I am very sorry for relying on you this much, but I'm really unclear on 
how to proceed regarding this.


- Robert


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93093
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36163/
 ---
 
 (Updated July 9, 2015, 2:39 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Removed trailing whitespaces
 
 
 Diffs
 -
 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
 7a588ebc99b5f07d533e48e10061a3075a63665a 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae3a904716ea51a2b27c7701ac30d13b854 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
 aeba61a95371faaba23c97d896321b8d95467f87 
   
 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
   
 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
 8d54c4639fc226b34e64915935c1d90e5917af2e 
   
 samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
  35086f54f526d5d88ad3bc312b71fce40260e7c6 
   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
 b063366f0f60e401765a000fa265c59dee4a461e 
   
 samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
 
 Diff: https://reviews.apache.org/r/36163/diff/
 
 
 Testing
 ---
 
 I wasn't really sure what kind of test (unit test / integration test) I 
 should make here, so any pointers would be greatly appreaciated! I tested the 
 change with the unit/integration tests already available.
 
 
 Thanks,
 
 Robert Zuljevic
 




Re: Review Request 37069: SAMZA-738 Samza Timer based metrics does not have enough precision

2015-08-05 Thread Aleksandar Pejakovic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37069/
---

(Updated Aug. 5, 2015, 2:58 p.m.)


Review request for samza.


Changes
---

Fixed test


Repository: samza


Description
---

Changed SystemProducersMetrics and RunLoop so that metrics now show nanoseconds 
instead milliseconds.


Diffs (updated)
-

  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
  
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 aa7a9bc 
  samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala 1643070 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
64a5844 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 39c54aa 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
 8aa73ce 

Diff: https://reviews.apache.org/r/37069/diff/


Testing
---

Tested on hello-samza - wikipedia-parser, results:
```
org.apache.samza.container.SamzaContainerMetrics:{
  commit-calls:10,
  window-ns:3198.62544796632,
  process-null-envelopes:56292,
  process-envelopes:989,
  window-calls:0,
  commit-ns:5130.901534393375,
  send-calls:0,
  process-calls:57283,
  choose-ns:10368839.818551894,
  process-ns:10390588.194071393,
  event-loop-utilization:0.99807554
}
```


Thanks,

Aleksandar Pejakovic



Re: How should Samza be run on AWS?

2015-08-05 Thread Gian Merlino
I don't know of any tutorials, but the order to tackle things would be:

1) Set up ZK- this could be a single node install for a PoC or a 3 or 5
node install for production. m3.medium is a reasonable node type.

2) Set up Kafka- could be a single instance without replication for a PoC.
For production, as many as you need, and you'd probably want replication. I
think if you want to use local instance storage, i2 instances are good, and
if you want to use EBS, probably m3 instances.

3) Set up YARN- this could be a single instance (running pseudo-distributed
with master  slave on the same machine) or two instances (one master, one
slave) for a PoC. I think c3 or r3 instance types are good for the slaves,
depending on how much memory you need. Workloads without large amounts of
state should be ok on c3 instances.

EMR might actually work for YARN if you use the long-running kind of
cluster (see:
http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-plan-longrunning-transient.html).
I haven't tried that, but it might be worth a shot before going for stock
apache hadoop.

On Tue, Aug 4, 2015 at 5:58 PM, Job-Selina Wu swucaree...@gmail.com wrote:

 Dear All:I was looking for the tutorial how to build and run Samza on
 AWS and then I found a link below. I am wondering if there is a detail
 tutorial about how to build Samza on AWS?

 Sincerely,
 Selina


 https://cwiki.apache.org/confluence/display/SAMZA/FAQ#FAQ-HowshouldSamzaberunonAWS
 ?
 How should Samza be run on AWS?

 From Gian Merlino:

- We've been using Samza in production on AWS for a little over a
 month. We're
just using the YARN runner on a mostly stock hadoop 2.4.0 cluster (not
EMR). Our experience is that c3s work well for the YARN instances and
 i2s
work well for the Kafka instances. Things have been pretty solid with
 that
setup. For scaling up and scaling down YARN, we just terminate instances
or add instances, and this works pretty well. It can take a few minutes
for the cluster to realize a node has gone and respawn containers
elsewhere. We have a separate Kafka cluster just for Samza's use,
different from our main Kafka cluster. The main reason is that we wanted
to isolate off the disk and network load of state compactions and
restores (we don't use compacted topics in our main Kafka cluster, but
we do use them with Samza, and the extra load on Kafka can be
substantial).



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-08-05 Thread Yi Pan (Data Infrastructure)


 On July 27, 2015, 7:36 a.m., Yi Pan (Data Infrastructure) wrote:
  The code LGTM. For testing, if we can verify this fix w/ a stateful 
  StreamTask w/ changelog enabled with some partition numbers that are 
  different from the default auto-creation partition number (i.e. 8) in 
  Kafka, it would be good. The integration test suite in samza-test should be 
  a good place to add the test there. Try following the steps in 
  samza-test/src/main/config/join/README and run the integration test. The 
  joiner task has a changelog configured with partition number of 2. You can 
  verify the test passed w/ your fix.
 
 Robert Zuljevic wrote:
 Hi Yi, sorry for bothering you so much with this task : ) I'll just write 
 down what I managed to do regarding integration tests:
 
 1. I ran integration tests via Zopkio and they all finished successfully.
 2. I ran the integration per guide in 
 samza-test/src/main/config/join/README and I suspect they ran successfully, 
 since none of them had an abnormal final status. I also ran the failure tests 
 (albeit after some limited fidling with the python scripts involved).
 3. I ran ./gradlew clean build (which runs TestStatefulTask). It 
 finished with a STANDARD_ERROR, which I assume is a good thing, but here is 
 the output, just in case: http://pastebin.com/aLT5jRdd
 
 What I suspect are the next (possible) steps:
 
 1. Create integration tests to be used with Zopkio. Here I am uncertain 
 how I would kill/stop Samza task to verify that changelog stream is being 
 consumed properly.
 2. Create another set of tasks similar to Checker/Emitter/Joiner/Watcher. 
 I believe this is unnecessary since they have their changelogs and their 
 restartability is being tested. Of course, I might be wrong.
 3. Add another test similar to TestStatefulTask.
a. Or add num.partitions param to TestStatefulTask.
 4. None of the above : )

 Again, I am very sorry for relying on you this much, but I'm really 
 unclear on how to proceed regarding this.

Hi, @Robert, sorry that I was not too specific in the comment before. If you 
have successfully ran the integration tests via the steps in 
samza-test/src/main/config/join/README. It should be good to go. Thanks!


- Yi


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93093
---


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36163/
 ---
 
 (Updated July 9, 2015, 2:39 p.m.)
 
 
 Review request for samza.
 
 
 Repository: samza
 
 
 Description
 ---
 
 Removed trailing whitespaces
 
 
 Diffs
 -
 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
 7a588ebc99b5f07d533e48e10061a3075a63665a 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae3a904716ea51a2b27c7701ac30d13b854 
   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
 aeba61a95371faaba23c97d896321b8d95467f87 
   
 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
  097f41062f3432ae9dc9a9737b48ed7b2f709f20 
   
 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
 8d54c4639fc226b34e64915935c1d90e5917af2e 
   
 samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
  d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
  35086f54f526d5d88ad3bc312b71fce40260e7c6 
   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
 b063366f0f60e401765a000fa265c59dee4a461e 
   
 samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
  1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
 
 Diff: https://reviews.apache.org/r/36163/diff/
 
 
 Testing
 ---
 
 I wasn't really sure what kind of test (unit test / integration test) I 
 should make here, so any pointers would be greatly appreaciated! I tested the 
 change with the unit/integration tests already available.
 
 
 Thanks,
 
 Robert Zuljevic
 




Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

2015-08-05 Thread Milinda Pathirage

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review94271
---


I went through old discussions and also went through Calcite's RelBuilder 
(https://github.com/milinda/incubator-calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java)
 to look at our TopologyBuilder from SQL query plan perspective. Below are my 
thoughts.

* I agree with Guozhang that we should first focus on simple use cases and I 
think we should not try to integrate support for building complex DAGs which 
contains multiple complex queries via this builder API.
* IMHO, TopologyBuilder is closer to query execution than to the query. And if 
we need people to compose SQL queries through a Java API, its better to have an 
API similar to jOOQ (http://www.jooq.org) for streaming SQL.
* AFAIK, **split** mentioned in one of Yi's comment doesn't occurs in SQL query 
plans because SQL operators always has one output (@Yi please correct me if I 
am wrong).
* IMHO, supporting something similar to views through the builder API may be 
useful. We can allow to refer the result from builder (may be not through 
*build* method but via method like *buildView*) method as inputs to the other 
queries to facilitate this .

So I'm proposing builder similar to following based on Calcite's RelBuilder API:

```java
TopologyBuilder builder = TopologyBuilder.create(..);

OperatorRouter router = builder.scan(stream1)
  .window(10, 2)
  .aggregate(builder.groupKey(...), 
builder.aggregateCall(...), ...)
  .scan(stream2)
  .window(10, 2)
  .aggregate(builder.groupKey(...), 
builder.aggregateCall(...), ...)
  .join(JoinType.INNER, builder.condition(...))
  .scan(stream2)
  .project(..)
  .window(10, 2)
  .join(joinType, condition)
  .partition(partionKey, number)
  .modify(Operation.INSERT, ..)
```

* In above mentioned API, *beginStream* is renamed to *scan* to take to API 
closer to physical plan.
* *scan* in the middle means a start of a new input or input sub-query
* *join* takes last two sub-trees (sub-queries) as inputs
* *modify* is used to insert/update tuples to streams or tables
* Builder will provide utility methods to create conditions, function calls, 
aggregates and ```GROUP BY``` clauses.
* Above assumes that there is no multi-output operators.
* Reusable sub-queries are not present in the above example, I'll think about 
it and introduce a mechanism to re-use sub-queries (Possibly introducing the 
view concept)

Please feel free to comment on this.

- Milinda Pathirage


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34500/
 ---
 
 (Updated May 20, 2015, 11:13 p.m.)
 
 
 Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda 
 Pathirage, Navina Ramesh, and Naveen Somasundaram.
 
 
 Bugs: SAMZA-552
 https://issues.apache.org/jira/browse/SAMZA-552
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-552: added operator builder API
 - The current operator builder only supports single output DAG topology yet
 
 
 Diffs
 -
 
   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
 PRE-CREATION 
   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
 PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
 PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
  PRE-CREATION 
   
 

Re: How should Samza be run on AWS?

2015-08-05 Thread Milinda Pathirage
I wrote several  Ansible playbooks to deploy YARN (without HDFS), Zookeeper
and Kafka to EC2 for deploying Samza jobs. If you know ansible those
scripts may be helpful. You can find them at
https://github.iu.edu/mpathira/samza-ec2-ansible. I was planning to add
document describing these scripts but could do it yet. I looked at EMR
also, but as I remember EMR job deployment model doesn't work with current
scripts provided by Samza.

I used R3 instances for Kafka and C3 instances for YARN. As I remember I
could get close to 1million msg/s with 3 node Kafka cluster running on
r3.xlarge instance and 2 (or 4) node YARN cluster running 4 stream tasks
per job.

Thanks
Milinda

On Wed, Aug 5, 2015 at 11:27 AM, Gian Merlino gianmerl...@gmail.com wrote:

 I don't know of any tutorials, but the order to tackle things would be:

 1) Set up ZK- this could be a single node install for a PoC or a 3 or 5
 node install for production. m3.medium is a reasonable node type.

 2) Set up Kafka- could be a single instance without replication for a PoC.
 For production, as many as you need, and you'd probably want replication. I
 think if you want to use local instance storage, i2 instances are good, and
 if you want to use EBS, probably m3 instances.

 3) Set up YARN- this could be a single instance (running pseudo-distributed
 with master  slave on the same machine) or two instances (one master, one
 slave) for a PoC. I think c3 or r3 instance types are good for the slaves,
 depending on how much memory you need. Workloads without large amounts of
 state should be ok on c3 instances.

 EMR might actually work for YARN if you use the long-running kind of
 cluster (see:

 http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-plan-longrunning-transient.html
 ).
 I haven't tried that, but it might be worth a shot before going for stock
 apache hadoop.

 On Tue, Aug 4, 2015 at 5:58 PM, Job-Selina Wu swucaree...@gmail.com
 wrote:

  Dear All:I was looking for the tutorial how to build and run Samza on
  AWS and then I found a link below. I am wondering if there is a detail
  tutorial about how to build Samza on AWS?
 
  Sincerely,
  Selina
 
 
 
 https://cwiki.apache.org/confluence/display/SAMZA/FAQ#FAQ-HowshouldSamzaberunonAWS
  ?
  How should Samza be run on AWS?
 
  From Gian Merlino:
 
 - We've been using Samza in production on AWS for a little over a
  month. We're
 just using the YARN runner on a mostly stock hadoop 2.4.0 cluster (not
 EMR). Our experience is that c3s work well for the YARN instances and
  i2s
 work well for the Kafka instances. Things have been pretty solid with
  that
 setup. For scaling up and scaling down YARN, we just terminate
 instances
 or add instances, and this works pretty well. It can take a few
 minutes
 for the cluster to realize a node has gone and respawn containers
 elsewhere. We have a separate Kafka cluster just for Samza's use,
 different from our main Kafka cluster. The main reason is that we
 wanted
 to isolate off the disk and network load of state compactions and
 restores (we don't use compacted topics in our main Kafka cluster, but
 we do use them with Samza, and the extra load on Kafka can be
 substantial).
 




-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org


Re: Missing a change log offset for SystemStreamPartition

2015-08-05 Thread Yan Fang
Hi Jordi,

I wonder, the reason of your first exception is that, you changed the task
number (partition number of your input stream), but still were using the
same changelog stream. It is trying to send to the partition 2, which does
not exist?

Can you reproduce this exception in a new job? (new store name, new job
name)

The second exception is caused by the wrong offset format, I believe.

Let me know how the new job goes.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri jbl...@nextel.es
wrote:

 Hi,

 I am trying to use the Keystore to manage some state information.
 Basically this is the code I am using. As long as I have tested, the rest
 is working correctly.

 private KeyValueStoreString, String storestp;

 public void init(Config config, TaskContext context) {
  this.storestp = (KeyValueStoreString, String)
 context.getStore(stepdb);
}

public void process(IncomingMessageEnvelope envelope,
 MessageCollector collector,
 TaskCoordinator coordinator)
 {
…
 String str = storestp.get(code)
 …
 }

 When I load it, it goes to running but, whe I send the messages through
 Kafka stream It goes to Failed state. I have found this Exception:
 Exception in thread main org.apache.samza.SamzaException: Missing a
 change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2].
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
 at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
 at scala.collection.AbstractMap.getOrElse(Map.scala:58)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
 at
 org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
 at
 scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
 at
 org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
 at
 org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
 at
 org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
 at
 org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
 at
 org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
 at
 org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
 at
 org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

 I have seen that the stepdb-changelog stream exists in Kafka. As a try to
 regenerate the missing offset and tes it I have connected through the
 command line and send a message to the stream. It was received correctly.
 Now I am seeing the following Exception:

 Exception in thread main java.lang.NullPointerException
 at
 scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
 at
 scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
 at scala.collection.SeqLike$class.size(SeqLike.scala:106)
 at
 scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
 at
 org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94)
 at
 org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at 

Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

2015-08-05 Thread Milinda Pathirage


 On Aug. 5, 2015, 5:34 p.m., Milinda Pathirage wrote:
  I went through old discussions and also went through Calcite's RelBuilder 
  (https://github.com/milinda/incubator-calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java)
   to look at our TopologyBuilder from SQL query plan perspective. Below are 
  my thoughts.
  
  * I agree with Guozhang that we should first focus on simple use cases and 
  I think we should not try to integrate support for building complex DAGs 
  which contains multiple complex queries via this builder API.
  * IMHO, TopologyBuilder is closer to query execution than to the query. And 
  if we need people to compose SQL queries through a Java API, its better to 
  have an API similar to jOOQ (http://www.jooq.org) for streaming SQL.
  * AFAIK, **split** mentioned in one of Yi's comment doesn't occurs in SQL 
  query plans because SQL operators always has one output (@Yi please correct 
  me if I am wrong).
  * IMHO, supporting something similar to views through the builder API may 
  be useful. We can allow to refer the result from builder (may be not 
  through *build* method but via method like *buildView*) method as inputs to 
  the other queries to facilitate this .
  
  So I'm proposing builder similar to following based on Calcite's RelBuilder 
  API:
  
  ```java
  TopologyBuilder builder = TopologyBuilder.create(..);
  
  OperatorRouter router = builder.scan(stream1)
.window(10, 2)
.aggregate(builder.groupKey(...), 
  builder.aggregateCall(...), ...)
.scan(stream2)
.window(10, 2)
.aggregate(builder.groupKey(...), 
  builder.aggregateCall(...), ...)
.join(JoinType.INNER, builder.condition(...))
.scan(stream2)
.project(..)
.window(10, 2)
.join(joinType, condition)
.partition(partionKey, number)
.modify(Operation.INSERT, ..)
  ```
  
  * In above mentioned API, *beginStream* is renamed to *scan* to take to API 
  closer to physical plan.
  * *scan* in the middle means a start of a new input or input sub-query
  * *join* takes last two sub-trees (sub-queries) as inputs
  * *modify* is used to insert/update tuples to streams or tables
  * Builder will provide utility methods to create conditions, function 
  calls, aggregates and ```GROUP BY``` clauses.
  * Above assumes that there is no multi-output operators.
  * Reusable sub-queries are not present in the above example, I'll think 
  about it and introduce a mechanism to re-use sub-queries (Possibly 
  introducing the view concept)
  
  Please feel free to comment on this.

Instead of group keys, aggregate calls or conditions we can directly take 
OperatorSpec instances, given that OperatorSpecs already encapsulate all the 
things necessary for an operator.


- Milinda


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review94271
---


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34500/
 ---
 
 (Updated May 20, 2015, 11:13 p.m.)
 
 
 Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda 
 Pathirage, Navina Ramesh, and Naveen Somasundaram.
 
 
 Bugs: SAMZA-552
 https://issues.apache.org/jira/browse/SAMZA-552
 
 
 Repository: samza
 
 
 Description
 ---
 
 SAMZA-552: added operator builder API
 - The current operator builder only supports single output DAG topology yet
 
 
 Diffs
 -
 
   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
 PRE-CREATION 
   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
 PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
 PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
  PRE-CREATION 
   
 samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
  PRE-CREATION