Samza vs Samza-Confluent on AWS

2015-08-11 Thread Job-Selina Wu
Dear All:

 I am trying to install Amazon on AWS. I do some research online. It
looks like I could deploy Kafka, Zookeeper, and YARN and then Samza job on
AWS or
I could deploy confluence platform and then zookeeper, Kafkaand the
Samza.

Can any tell me Which way you use? which way is better and the reasons?


Some reference list below:

https://cwiki.apache.org/confluence/display/SAMZA/Ecosystem

http://docs.confluent.io/1.0/quickstart.html

Sincerely,
Selina


Re: Missing a change log offset for SystemStreamPartition

2015-08-11 Thread Yi Pan
Hi, Jordi,

The local KV-store is meant to be accessed by the Samza container locally
on the machine. If you were referring to the use case that the local
KV-store is accessed by a container from a different Samza job, it is not
supported. And what does it exactly mean when you say load the DB to be
able to use it from the consuming job? We may be of more help if we know
your use case in more details.

Thanks!

-Yi



On Tue, Aug 11, 2015 at 3:00 AM, Jordi Blasi Uribarri jbl...@nextel.es
wrote:

 InitableTask tas kwas missing. That responds to another problem that I was
 experiencing (and left for later). Anyway the exception was still there
 until I commented the changelog definition line in the properties file:

 #stores.test12db.changelog=kafka.test12db-changelog

 As I understand it in case of job going down information will be lost.
 That is not a real issue as I am storing temporal information there.

 What I am seeing is something that does not work as I expected. Maybe I am
 not understanding correctly how the system works. I need that a job has
 access to the information previously stored in the storage. I was planning
 a loader job that on receiving some messages with data it stores them in
 RocksDb and that information should be consumed by a different consumer job
 to use it for calculation.

 I see that in the loader job I can put and get information correctly. When
 I try to access the same storage from a different job I just get null
 results.

 How I am supposed to load the DB to be able to use it from the consuming
 job? Is RocksDB the tool to use or should I use any other technique?

 Thanks,

 Jordi



 -Mensaje original-
 De: Yi Pan [mailto:nickpa...@gmail.com]
 Enviado el: martes, 11 de agosto de 2015 3:27
 Para: dev@samza.apache.org
 Asunto: Re: Missing a change log offset for SystemStreamPartition

 Hi, Jordi,

 Agree with Yan. More specifically, your class definition should be
 something like:
 {code}
 public class testStore implements StreamTask, InitableTask { ...
 }
 {code}

 On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang yanfang...@gmail.com wrote:

  Hi Jordi,
 
  I think, you need to implement the *InitableTask* interface.
  Otherwise, the content in the init method will not be processed.
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri
  jbl...@nextel.es
  wrote:
 
   Just for making it easier to reproduce the problem I just reduced
   the
  code
   of the job to the minimum:
  
   package test;
  
   import org.apache.samza.config.Config; import
   org.apache.samza.storage.kv.KeyValueStore;
   import org.apache.samza.system.IncomingMessageEnvelope;
   import org.apache.samza.task.MessageCollector;
   import org.apache.samza.task.StreamTask; import
   org.apache.samza.task.TaskContext;
   import org.apache.samza.task.TaskCoordinator;
  
   public class testStore implements StreamTask {
   private KeyValueStoreString, String storestp;
  
public void init(Config config, TaskContext context) {
   this.storestp = (KeyValueStoreString, String)
   context.getStore(test11db);
 }
  
   public void process (IncomingMessageEnvelope envelope,
  MessageCollector collector,
  TaskCoordinator coordinator)
   {
   String msgin = (String) envelope.getMessage();
   storestp.put(test1,msgin);
   }
   }
  
   The properties file contains this:
  
   task.class=test.testStore
   job.name=test.testStore
   job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
   yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.
   gz
  
  
  
  systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF
  actory
  
  systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:
  2181
  
  
  systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:
  9093,kfk-kafka02:9092,kfk-kafka02:9093
  
  
  systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka
  01:9093,kfk-kafka02:9092,kfk-kafka02:909
  
   # Declare that we want our job's checkpoints to be written to Kafka
  
  
  task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpo
  intManagerFactory
   task.checkpoint.system=kafka
  
   # The job consumes a topic called configtpc from the kafka
   system task.inputs=kafka.configtpc
  
   # Define a serializer/deserializer called json which parses JSON
  messages
  
  
  serializers.registry.json.class=org.apache.samza.serializers.JsonSerde
  Factory
  
  
  serializers.registry.string.class=org.apache.samza.serializers.StringS
  erdeFactory
  
   # Serializer for the system
   systems.kafka.samza.msg.serde=string
   systems.kafka.streams.tracetpc.samza.msg.serde=json
  
   # Use the key-value store implementation for a store called my-store
  
  
  

Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-08-11 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review93553
---


Lgtm, overall! I was able to test it as well.
Some feedback: Would it possible to mark the SSP of the IncomingMessageEnvelope 
with a boolean flag indicating whether it is from a broadcast stream or not? 
When I tried writing a sample code, I realized that the streamTask has to be 
aware of the exact topic name in order to figure out if the message was from a 
broadcast system stream. If we had an API such as 
envelope.getSystemStreamPartition().isBroadcast(), I think it will be more 
convenient. What do you think?


docs/learn/documentation/versioned/jobs/configuration-table.html (line 453)
https://reviews.apache.org/r/34974/#comment147930

Rephrase - This property specifies the partitions that all tasks should 
consume.



samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java (line 73)
https://reviews.apache.org/r/34974/#comment147929

malformed html error in javadocs for '' and ''
Also, in Line 79


I think the delay in my review caused a few more commits to go through. So, you 
have to rebase and update your patch again. 
I found this error:
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala:27:
 error: class HdfsSystemAdmin needs to be abstract, since method 
offsetComparator in trait SystemAdmin of type (x$1: String, x$2: String)Integer 
is not defined
class HdfsSystemAdmin extends SystemAdmin with Logging {
  ^
one error found

I guess you can just throw an UnsupportedMethodException for now.

- Navina Ramesh


On July 29, 2015, 10:49 p.m., Yan Fang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34974/
 ---
 
 (Updated July 29, 2015, 10:49 p.m.)
 
 
 Review request for samza.
 
 
 Bugs: SAMZA-676
 https://issues.apache.org/jira/browse/SAMZA-676
 
 
 Repository: samza
 
 
 Description
 ---
 
 1. added offsetComparator method in SystemAdmin Interface
 
 2. added task.global.inputs config
 
 3. rewrote Grouper classes using Java; allows to assign global streams during 
 grouping
 
 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
 to preserve messages order
 
 5. added taskNames to the offsets in OffsetManager
 
 6. allowed to assign one SSP to multiple taskInstances
 
 7. skipped already-processed messages in RunLoop
 
 8. unit tests for all changes
 
 
 Diffs
 -
 
   checkstyle/import-control.xml 6654319 
   docs/learn/documentation/versioned/container/samza-container.md 9f46414 
   docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae 
   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
 PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
  PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
 20e5d26 
   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 27b2517 
   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
 c5a5ea5 
   
 samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
  9dc7051 
   
 samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
  44e95fc 
   
 samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
  3c0acad 
   
 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
  097f410 
   samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
 PRE-CREATION 
   
 samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
  PRE-CREATION 
   
 samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
  PRE-CREATION 
   
 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
 8d54c46 
   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
 

Re: Review Request 36006: SAMZA-724: Writing a tool to read from the coordinator stream and react to config changes accordingly.

2015-08-11 Thread Shadi A. Noghabi

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36006/
---

(Updated Aug. 11, 2015, 6:38 p.m.)


Review request for samza, Yi Pan (Data Infrastructure), Navina Ramesh, and 
Naveen Somasundaram.


Repository: samza


Description
---

This RBis for SAMZA-724.

After a job is submitted, it might need some configuration change, specifically 
it might need more containers. In SAMZA-704 a tool is being added to write to 
the coordinator stream (CoordinatorStreamWriter).  This tool can be used to 
write new configurations to the coordinator stream. However, another tool 
(ConfigManager) is needed to read the config changes and react to them, which 
is the goal of this task. This tool should be brought up after the job is 
submitted and read any config changes added to the coordinator stream, and 
react to each accordingly. 

This tool, called the Config Manager, is focusing on handling container changs 
by reacting to set-config massages with key yarn.container.count. 

The config manager is a separate standa alone module, that should be brought up 
separately after the submission of a job. Therefore, you have to add two 
configurations to the input config file:
1. yarn.rm.address= ip of resource manager in yarn. ex: localhost 
2. yarn.rm.port= the port of the resource manager http server. ex: 8088 

The config manger will periodically poll the coordinator stream to see if there 
are any new messages. This period is set to 100 ms by deafualt. However, it can 
be configured by adding configManager.polling.interval=polling interval to 
the input config file. Thus, overal the command to run the config manager along 
with the job would be:


path to samza deployment/bin/run-config-manager.sh --config-factory=config 
factory --config-path=path to config file of a job


Diffs (updated)
-

  build.gradle a935088eccb3aee4fbde21275fa8e701c215a69e 
  checkstyle/import-control.xml 24ed680785175f3cdf955602b7a813684edd813e 
  gradle/dependency-versions.gradle fb06e8ed393d1a38abfa1a48fe5244fc7f6c7339 
  
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
 PRE-CREATION 
  
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 2277a732b9ab763edf19a0fbec288ff72b27583b 
  samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
d7c928c7401e539a370d4e82276e7dabbce1b638 
  samza-shell/src/main/bash/run-config-manager.sh PRE-CREATION 
  
samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 c6e994ff707af802ded57c3bc1762971892014da 
  
samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 
ce88698c12c4bf6f4cf128f92d60b0b9496997d7 
  settings.gradle a8d2c885254ca3994327fda18e09c49bc9c5e830 

Diff: https://reviews.apache.org/r/36006/diff/


Testing
---

Tested with hello samza and works properly.


Thanks,

Shadi A. Noghabi



Re: Review Request 34974: SAMZA-676: implement broadcast stream

2015-08-11 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review94980
---

Ship it!


Ship It!

- Navina Ramesh


On July 29, 2015, 10:49 p.m., Yan Fang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34974/
 ---
 
 (Updated July 29, 2015, 10:49 p.m.)
 
 
 Review request for samza.
 
 
 Bugs: SAMZA-676
 https://issues.apache.org/jira/browse/SAMZA-676
 
 
 Repository: samza
 
 
 Description
 ---
 
 1. added offsetComparator method in SystemAdmin Interface
 
 2. added task.global.inputs config
 
 3. rewrote Grouper classes using Java; allows to assign global streams during 
 grouping
 
 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
 to preserve messages order
 
 5. added taskNames to the offsets in OffsetManager
 
 6. allowed to assign one SSP to multiple taskInstances
 
 7. skipped already-processed messages in RunLoop
 
 8. unit tests for all changes
 
 
 Diffs
 -
 
   checkstyle/import-control.xml 6654319 
   docs/learn/documentation/versioned/container/samza-container.md 9f46414 
   docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 
   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
   
 samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
  249b8ae 
   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
 PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
  PRE-CREATION 
   
 samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
  PRE-CREATION 
   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
 20e5d26 
   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
 27b2517 
   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
 c5a5ea5 
   
 samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
  9dc7051 
   
 samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
  44e95fc 
   
 samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
  3c0acad 
   
 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
  097f410 
   samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
 PRE-CREATION 
   
 samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
  PRE-CREATION 
   
 samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
  PRE-CREATION 
   
 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
 8d54c46 
   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
 64a5844 
   
 samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
 84fdeaa 
   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
 7caad28 
   
 samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
  a14169b 
   
 samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
  74daf72 
   
 samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
  deb3895 
   
 samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
  4097ac7 
   
 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
  1fd5dd3 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
  35086f5 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
  de00320 
   
 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
  1629035 
   
 samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
  2a84328 
   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
 b063366 
   
 samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
  1e936b4 
 
 Diff: https://reviews.apache.org/r/34974/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yan Fang
 




Re: Missing a change log offset for SystemStreamPartition

2015-08-11 Thread Jordi Blasi Uribarri
Hi,

What I am trying to develop is (I think) an evolution of the Stream-table join. 
For every message processed, depending on the value a variable contained on it 
go to the store and decide to which topic it must be sent. It is some kind of 
workflow manager (very simple). I can read from a store the data to guide the 
decision: key- previous step, value- output topic.

My problem is how to make that this information is already available for the 
job when the process() method is called. I was trying to load this info on a 
(let's call it) configuration job that receives all the data pairs and loads to 
the store. As I see this is not supported so I need another way to get this 
info into the store. 

I see the same problem with the Zip code example in the documentation: how do 
the zip codes get to the store so they can be crossed against the incoming 
messages?

I am thinking of a solution that could be on the initialization process read it 
from an external source, maybe a MySQL server and load it to the store. Then on 
the process of the messages it could access the data. Is this a correct way of 
doing it?

I am not sure if I am explaining correctly what I am trying to do.

The other question I have is, once the data is loaded to the store, is it 
replicated to all the copies of the job? 

thanks,

Jordi


De: Yi Pan [nickpa...@gmail.com]
Enviado: martes, 11 de agosto de 2015 19:03
Para: dev@samza.apache.org
Asunto: Re: Missing a change log offset for SystemStreamPartition

Hi, Jordi,

The local KV-store is meant to be accessed by the Samza container locally
on the machine. If you were referring to the use case that the local
KV-store is accessed by a container from a different Samza job, it is not
supported. And what does it exactly mean when you say load the DB to be
able to use it from the consuming job? We may be of more help if we know
your use case in more details.

Thanks!

-Yi



On Tue, Aug 11, 2015 at 3:00 AM, Jordi Blasi Uribarri jbl...@nextel.es
wrote:

 InitableTask tas kwas missing. That responds to another problem that I was
 experiencing (and left for later). Anyway the exception was still there
 until I commented the changelog definition line in the properties file:

 #stores.test12db.changelog=kafka.test12db-changelog

 As I understand it in case of job going down information will be lost.
 That is not a real issue as I am storing temporal information there.

 What I am seeing is something that does not work as I expected. Maybe I am
 not understanding correctly how the system works. I need that a job has
 access to the information previously stored in the storage. I was planning
 a loader job that on receiving some messages with data it stores them in
 RocksDb and that information should be consumed by a different consumer job
 to use it for calculation.

 I see that in the loader job I can put and get information correctly. When
 I try to access the same storage from a different job I just get null
 results.

 How I am supposed to load the DB to be able to use it from the consuming
 job? Is RocksDB the tool to use or should I use any other technique?

 Thanks,

 Jordi



 -Mensaje original-
 De: Yi Pan [mailto:nickpa...@gmail.com]
 Enviado el: martes, 11 de agosto de 2015 3:27
 Para: dev@samza.apache.org
 Asunto: Re: Missing a change log offset for SystemStreamPartition

 Hi, Jordi,

 Agree with Yan. More specifically, your class definition should be
 something like:
 {code}
 public class testStore implements StreamTask, InitableTask { ...
 }
 {code}

 On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang yanfang...@gmail.com wrote:

  Hi Jordi,
 
  I think, you need to implement the *InitableTask* interface.
  Otherwise, the content in the init method will not be processed.
 
  Thanks,
 
  Fang, Yan
  yanfang...@gmail.com
 
  On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri
  jbl...@nextel.es
  wrote:
 
   Just for making it easier to reproduce the problem I just reduced
   the
  code
   of the job to the minimum:
  
   package test;
  
   import org.apache.samza.config.Config; import
   org.apache.samza.storage.kv.KeyValueStore;
   import org.apache.samza.system.IncomingMessageEnvelope;
   import org.apache.samza.task.MessageCollector;
   import org.apache.samza.task.StreamTask; import
   org.apache.samza.task.TaskContext;
   import org.apache.samza.task.TaskCoordinator;
  
   public class testStore implements StreamTask {
   private KeyValueStoreString, String storestp;
  
public void init(Config config, TaskContext context) {
   this.storestp = (KeyValueStoreString, String)
   context.getStore(test11db);
 }
  
   public void process (IncomingMessageEnvelope envelope,
  MessageCollector collector,
  TaskCoordinator coordinator)
   {
   String msgin = (String) envelope.getMessage();
 

Re: Mailing list join request

2015-08-11 Thread Yan Fang
shouldn't you send to dev-subscr...@samza.apache.org ? :)

Fang, Yan
yanfang...@gmail.com

On Wed, Aug 12, 2015 at 12:33 PM, Eli Reisman apache.mail...@gmail.com
wrote:

 subscribe



Mailing list join request

2015-08-11 Thread Eli Reisman
subscribe