Samza vs Samza-Confluent on AWS
Dear All: I am trying to install Amazon on AWS. I do some research online. It looks like I could deploy Kafka, Zookeeper, and YARN and then Samza job on AWS or I could deploy confluence platform and then zookeeper, Kafkaand the Samza. Can any tell me Which way you use? which way is better and the reasons? Some reference list below: https://cwiki.apache.org/confluence/display/SAMZA/Ecosystem http://docs.confluent.io/1.0/quickstart.html Sincerely, Selina
Re: Missing a change log offset for SystemStreamPartition
Hi, Jordi, The local KV-store is meant to be accessed by the Samza container locally on the machine. If you were referring to the use case that the local KV-store is accessed by a container from a different Samza job, it is not supported. And what does it exactly mean when you say load the DB to be able to use it from the consuming job? We may be of more help if we know your use case in more details. Thanks! -Yi On Tue, Aug 11, 2015 at 3:00 AM, Jordi Blasi Uribarri jbl...@nextel.es wrote: InitableTask tas kwas missing. That responds to another problem that I was experiencing (and left for later). Anyway the exception was still there until I commented the changelog definition line in the properties file: #stores.test12db.changelog=kafka.test12db-changelog As I understand it in case of job going down information will be lost. That is not a real issue as I am storing temporal information there. What I am seeing is something that does not work as I expected. Maybe I am not understanding correctly how the system works. I need that a job has access to the information previously stored in the storage. I was planning a loader job that on receiving some messages with data it stores them in RocksDb and that information should be consumed by a different consumer job to use it for calculation. I see that in the loader job I can put and get information correctly. When I try to access the same storage from a different job I just get null results. How I am supposed to load the DB to be able to use it from the consuming job? Is RocksDB the tool to use or should I use any other technique? Thanks, Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: martes, 11 de agosto de 2015 3:27 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi, Jordi, Agree with Yan. More specifically, your class definition should be something like: {code} public class testStore implements StreamTask, InitableTask { ... } {code} On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang yanfang...@gmail.com wrote: Hi Jordi, I think, you need to implement the *InitableTask* interface. Otherwise, the content in the init method will not be processed. Thanks, Fang, Yan yanfang...@gmail.com On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri jbl...@nextel.es wrote: Just for making it easier to reproduce the problem I just reduced the code of the job to the minimum: package test; import org.apache.samza.config.Config; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; public class testStore implements StreamTask { private KeyValueStoreString, String storestp; public void init(Config config, TaskContext context) { this.storestp = (KeyValueStoreString, String) context.getStore(test11db); } public void process (IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String msgin = (String) envelope.getMessage(); storestp.put(test1,msgin); } } The properties file contains this: task.class=test.testStore job.name=test.testStore job.factory.class=org.apache.samza.job.yarn.YarnJobFactory yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar. gz systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF actory systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02: 2181 systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01: 9093,kfk-kafka02:9092,kfk-kafka02:9093 systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka 01:9093,kfk-kafka02:9092,kfk-kafka02:909 # Declare that we want our job's checkpoints to be written to Kafka task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpo intManagerFactory task.checkpoint.system=kafka # The job consumes a topic called configtpc from the kafka system task.inputs=kafka.configtpc # Define a serializer/deserializer called json which parses JSON messages serializers.registry.json.class=org.apache.samza.serializers.JsonSerde Factory serializers.registry.string.class=org.apache.samza.serializers.StringS erdeFactory # Serializer for the system systems.kafka.samza.msg.serde=string systems.kafka.streams.tracetpc.samza.msg.serde=json # Use the key-value store implementation for a store called my-store
Re: Review Request 34974: SAMZA-676: implement broadcast stream
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34974/#review93553 --- Lgtm, overall! I was able to test it as well. Some feedback: Would it possible to mark the SSP of the IncomingMessageEnvelope with a boolean flag indicating whether it is from a broadcast stream or not? When I tried writing a sample code, I realized that the streamTask has to be aware of the exact topic name in order to figure out if the message was from a broadcast system stream. If we had an API such as envelope.getSystemStreamPartition().isBroadcast(), I think it will be more convenient. What do you think? docs/learn/documentation/versioned/jobs/configuration-table.html (line 453) https://reviews.apache.org/r/34974/#comment147930 Rephrase - This property specifies the partitions that all tasks should consume. samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java (line 73) https://reviews.apache.org/r/34974/#comment147929 malformed html error in javadocs for '' and '' Also, in Line 79 I think the delay in my review caused a few more commits to go through. So, you have to rebase and update your patch again. I found this error: samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala:27: error: class HdfsSystemAdmin needs to be abstract, since method offsetComparator in trait SystemAdmin of type (x$1: String, x$2: String)Integer is not defined class HdfsSystemAdmin extends SystemAdmin with Logging { ^ one error found I guess you can just throw an UnsupportedMethodException for now. - Navina Ramesh On July 29, 2015, 10:49 p.m., Yan Fang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34974/ --- (Updated July 29, 2015, 10:49 p.m.) Review request for samza. Bugs: SAMZA-676 https://issues.apache.org/jira/browse/SAMZA-676 Repository: samza Description --- 1. added offsetComparator method in SystemAdmin Interface 2. added task.global.inputs config 3. rewrote Grouper classes using Java; allows to assign global streams during grouping 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to preserve messages order 5. added taskNames to the offsets in OffsetManager 6. allowed to assign one SSP to multiple taskInstances 7. skipped already-processed messages in RunLoop 8. unit tests for all changes Diffs - checkstyle/import-control.xml 6654319 docs/learn/documentation/versioned/container/samza-container.md 9f46414 docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java PRE-CREATION samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 20e5d26 samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 27b2517 samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala c5a5ea5 samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 9dc7051 samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala 44e95fc samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala 3c0acad samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f410 samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java PRE-CREATION samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java PRE-CREATION samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java PRE-CREATION samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c46 samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
Re: Review Request 36006: SAMZA-724: Writing a tool to read from the coordinator stream and react to config changes accordingly.
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36006/ --- (Updated Aug. 11, 2015, 6:38 p.m.) Review request for samza, Yi Pan (Data Infrastructure), Navina Ramesh, and Naveen Somasundaram. Repository: samza Description --- This RBis for SAMZA-724. After a job is submitted, it might need some configuration change, specifically it might need more containers. In SAMZA-704 a tool is being added to write to the coordinator stream (CoordinatorStreamWriter). This tool can be used to write new configurations to the coordinator stream. However, another tool (ConfigManager) is needed to read the config changes and react to them, which is the goal of this task. This tool should be brought up after the job is submitted and read any config changes added to the coordinator stream, and react to each accordingly. This tool, called the Config Manager, is focusing on handling container changs by reacting to set-config massages with key yarn.container.count. The config manager is a separate standa alone module, that should be brought up separately after the submission of a job. Therefore, you have to add two configurations to the input config file: 1. yarn.rm.address= ip of resource manager in yarn. ex: localhost 2. yarn.rm.port= the port of the resource manager http server. ex: 8088 The config manger will periodically poll the coordinator stream to see if there are any new messages. This period is set to 100 ms by deafualt. However, it can be configured by adding configManager.polling.interval=polling interval to the input config file. Thus, overal the command to run the config manager along with the job would be: path to samza deployment/bin/run-config-manager.sh --config-factory=config factory --config-path=path to config file of a job Diffs (updated) - build.gradle a935088eccb3aee4fbde21275fa8e701c215a69e checkstyle/import-control.xml 24ed680785175f3cdf955602b7a813684edd813e gradle/dependency-versions.gradle fb06e8ed393d1a38abfa1a48fe5244fc7f6c7339 samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java PRE-CREATION samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java PRE-CREATION samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 2277a732b9ab763edf19a0fbec288ff72b27583b samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala d7c928c7401e539a370d4e82276e7dabbce1b638 samza-shell/src/main/bash/run-config-manager.sh PRE-CREATION samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala c6e994ff707af802ded57c3bc1762971892014da samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala ce88698c12c4bf6f4cf128f92d60b0b9496997d7 settings.gradle a8d2c885254ca3994327fda18e09c49bc9c5e830 Diff: https://reviews.apache.org/r/36006/diff/ Testing --- Tested with hello samza and works properly. Thanks, Shadi A. Noghabi
Re: Review Request 34974: SAMZA-676: implement broadcast stream
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34974/#review94980 --- Ship it! Ship It! - Navina Ramesh On July 29, 2015, 10:49 p.m., Yan Fang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34974/ --- (Updated July 29, 2015, 10:49 p.m.) Review request for samza. Bugs: SAMZA-676 https://issues.apache.org/jira/browse/SAMZA-676 Repository: samza Description --- 1. added offsetComparator method in SystemAdmin Interface 2. added task.global.inputs config 3. rewrote Grouper classes using Java; allows to assign global streams during grouping 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to preserve messages order 5. added taskNames to the offsets in OffsetManager 6. allowed to assign one SSP to multiple taskInstances 7. skipped already-processed messages in RunLoop 8. unit tests for all changes Diffs - checkstyle/import-control.xml 6654319 docs/learn/documentation/versioned/container/samza-container.md 9f46414 docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40 samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java PRE-CREATION samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java PRE-CREATION samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 20e5d26 samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 27b2517 samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala c5a5ea5 samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 9dc7051 samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala 44e95fc samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala 3c0acad samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f410 samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java PRE-CREATION samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java PRE-CREATION samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java PRE-CREATION samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c46 samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 64a5844 samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 84fdeaa samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 7caad28 samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala a14169b samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala 74daf72 samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala deb3895 samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 4097ac7 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java 1fd5dd3 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f5 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala de00320 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala 1629035 samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala 2a84328 samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366 samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b4 Diff: https://reviews.apache.org/r/34974/diff/ Testing --- Thanks, Yan Fang
Re: Missing a change log offset for SystemStreamPartition
Hi, What I am trying to develop is (I think) an evolution of the Stream-table join. For every message processed, depending on the value a variable contained on it go to the store and decide to which topic it must be sent. It is some kind of workflow manager (very simple). I can read from a store the data to guide the decision: key- previous step, value- output topic. My problem is how to make that this information is already available for the job when the process() method is called. I was trying to load this info on a (let's call it) configuration job that receives all the data pairs and loads to the store. As I see this is not supported so I need another way to get this info into the store. I see the same problem with the Zip code example in the documentation: how do the zip codes get to the store so they can be crossed against the incoming messages? I am thinking of a solution that could be on the initialization process read it from an external source, maybe a MySQL server and load it to the store. Then on the process of the messages it could access the data. Is this a correct way of doing it? I am not sure if I am explaining correctly what I am trying to do. The other question I have is, once the data is loaded to the store, is it replicated to all the copies of the job? thanks, Jordi De: Yi Pan [nickpa...@gmail.com] Enviado: martes, 11 de agosto de 2015 19:03 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi, Jordi, The local KV-store is meant to be accessed by the Samza container locally on the machine. If you were referring to the use case that the local KV-store is accessed by a container from a different Samza job, it is not supported. And what does it exactly mean when you say load the DB to be able to use it from the consuming job? We may be of more help if we know your use case in more details. Thanks! -Yi On Tue, Aug 11, 2015 at 3:00 AM, Jordi Blasi Uribarri jbl...@nextel.es wrote: InitableTask tas kwas missing. That responds to another problem that I was experiencing (and left for later). Anyway the exception was still there until I commented the changelog definition line in the properties file: #stores.test12db.changelog=kafka.test12db-changelog As I understand it in case of job going down information will be lost. That is not a real issue as I am storing temporal information there. What I am seeing is something that does not work as I expected. Maybe I am not understanding correctly how the system works. I need that a job has access to the information previously stored in the storage. I was planning a loader job that on receiving some messages with data it stores them in RocksDb and that information should be consumed by a different consumer job to use it for calculation. I see that in the loader job I can put and get information correctly. When I try to access the same storage from a different job I just get null results. How I am supposed to load the DB to be able to use it from the consuming job? Is RocksDB the tool to use or should I use any other technique? Thanks, Jordi -Mensaje original- De: Yi Pan [mailto:nickpa...@gmail.com] Enviado el: martes, 11 de agosto de 2015 3:27 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi, Jordi, Agree with Yan. More specifically, your class definition should be something like: {code} public class testStore implements StreamTask, InitableTask { ... } {code} On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang yanfang...@gmail.com wrote: Hi Jordi, I think, you need to implement the *InitableTask* interface. Otherwise, the content in the init method will not be processed. Thanks, Fang, Yan yanfang...@gmail.com On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri jbl...@nextel.es wrote: Just for making it easier to reproduce the problem I just reduced the code of the job to the minimum: package test; import org.apache.samza.config.Config; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; public class testStore implements StreamTask { private KeyValueStoreString, String storestp; public void init(Config config, TaskContext context) { this.storestp = (KeyValueStoreString, String) context.getStore(test11db); } public void process (IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String msgin = (String) envelope.getMessage();
Re: Mailing list join request
shouldn't you send to dev-subscr...@samza.apache.org ? :) Fang, Yan yanfang...@gmail.com On Wed, Aug 12, 2015 at 12:33 PM, Eli Reisman apache.mail...@gmail.com wrote: subscribe
Mailing list join request
subscribe