Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36163/ --- (Updated Aug. 10, 2015, 7:47 a.m.) Review request for samza. Repository: samza Description (updated) --- Rebased branch on latest master branch. Diffs (updated) - samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ea2eaa5f663e717414b8a4b42123842105df150d samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c39cdc7ed86b591f379eda24005df25928bd8f16 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 4097ac7fa1f2b70c13bf8b618808db1f688a99e6 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java 1fd5dd34f70cb701b4c134b27464863d445b35f3 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala f6d53e084771cfbfd1d1ca68f1ac54db0bd91067 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d Diff: https://reviews.apache.org/r/36163/diff/ Testing --- I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available. Thanks, Robert Zuljevic
Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36163/ --- (Updated Aug. 10, 2015, 7:50 a.m.) Review request for samza. Repository: samza Description (updated) --- Removed unnecessary whitespaces. Diffs (updated) - samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ea2eaa5f663e717414b8a4b42123842105df150d samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c39cdc7ed86b591f379eda24005df25928bd8f16 samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 4097ac7fa1f2b70c13bf8b618808db1f688a99e6 samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java 1fd5dd34f70cb701b4c134b27464863d445b35f3 samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala f6d53e084771cfbfd1d1ca68f1ac54db0bd91067 samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d Diff: https://reviews.apache.org/r/36163/diff/ Testing --- I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available. Thanks, Robert Zuljevic
RE: Missing a change log offset for SystemStreamPartition
Hi, I have migrated samza to the last versiĆ³n and recreated the job with a new store name so the streams were created clean. I am getting the same error: java version "1.7.0_79" OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No appenders could be found for logger (org.apache.samza.metrics.JmxServer). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [kafka, commdb-changelog, 2]. at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Is there any other info I can attach to help find the problem? Thanks, Jordi -Mensaje original- De: Yan Fang [mailto:yanfang...@gmail.com] Enviado el: viernes, 07 de agosto de 2015 23:21 Para: dev@samza.apache.org Asunto: Re: Missing a change log offset for SystemStreamPartition Hi Jordi, Sorry for getting you back late. Was quite busy yesterday. I think the reason of your error is that you mismatched Samza version and Kafka version. Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not supported. So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*. This match is proved working. Hope this helps you. Thanks, Fang, Yan yanfang...@gmail.com On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri wrote: > I changed the job name and the store name. I was defining two > different stores and in case that was the problem, I also eliminated the > second one. > I am getting the same exception. > > Exception in thread "main" org.apache.samza.SamzaException: Missing a > change log offset for SystemStreamPartition [kafka, testdb-changelog, 2]. > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) > at > scala.collection.TraversableLike$WithFilte
RE: Missing a change log offset for SystemStreamPartition
Just for making it easier to reproduce the problem I just reduced the code of the job to the minimum: package test; import org.apache.samza.config.Config; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; public class testStore implements StreamTask { private KeyValueStore storestp; public void init(Config config, TaskContext context) { this.storestp = (KeyValueStore) context.getStore("test11db"); } public void process (IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String msgin = (String) envelope.getMessage(); storestp.put("test1",msgin); } } The properties file contains this: task.class=test.testStore job.name=test.testStore job.factory.class=org.apache.samza.job.yarn.YarnJobFactory yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.gz systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181 systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093 systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909 # Declare that we want our job's checkpoints to be written to Kafka task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory task.checkpoint.system=kafka # The job consumes a topic called "configtpc" from the "kafka" system task.inputs=kafka.configtpc # Define a serializer/deserializer called "json" which parses JSON messages serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory # Serializer for the system systems.kafka.samza.msg.serde=string systems.kafka.streams.tracetpc.samza.msg.serde=json # Use the key-value store implementation for a store called "my-store" stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory # Use the Kafka topic "routingdb-changelog" as the changelog stream for this store. # This enables automatic recovery of the store after a failure. If you don't # configure this, no changelog stream will be generated. stores.test11db.changelog=kafka.test11db-changelog # Encode keys and values in the store as UTF-8 strings. stores.test11db.key.serde=string stores.test11db.msg.serde=string # Commit checkpoints every 1 seconds task.commit.ms=1000 With this, I am getting just the same error: java version "1.7.0_79" OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No appenders could be found for logger (org.apache.samza.metrics.JmxServer). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [kafka, test11db-changelog, 2]. at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.co
Re: Review Request 36006: SAMZA-724: Writing a tool to read from the coordinator stream and react to config changes accordingly.
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36006/ --- (Updated Aug. 10, 2015, 6:35 p.m.) Review request for samza, Yi Pan (Data Infrastructure), Navina Ramesh, and Naveen Somasundaram. Repository: samza Description --- This RBis for SAMZA-724. After a job is submitted, it might need some configuration change, specifically it might need more containers. In SAMZA-704 a tool is being added to write to the coordinator stream (CoordinatorStreamWriter). This tool can be used to write new configurations to the coordinator stream. However, another tool (ConfigManager) is needed to read the config changes and react to them, which is the goal of this task. This tool should be brought up after the job is submitted and read any config changes added to the coordinator stream, and react to each accordingly. This tool, called the Config Manager, is focusing on handling container changs by reacting to set-config massages with key "yarn.container.count". The config manager is a separate standa alone module, that should be brought up separately after the submission of a job. Therefore, you have to add two configurations to the input config file: 1. yarn.rm.address= 2. yarn.rm.port= The config manger will periodically poll the coordinator stream to see if there are any new messages. This period is set to 100 ms by deafualt. However, it can be configured by adding configManager.polling.interval= to the input config file. Thus, overal the command to run the config manager along with the job would be: /bin/run-config-manager.sh --config-factory= --config-path= Diffs (updated) - build.gradle 0852adc4e8e0c2816afd1ebf433f1af6b44852f7 checkstyle/import-control.xml 24ed680785175f3cdf955602b7a813684edd813e gradle/dependency-versions.gradle fb06e8ed393d1a38abfa1a48fe5244fc7f6c7339 samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java PRE-CREATION samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java PRE-CREATION samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 2277a732b9ab763edf19a0fbec288ff72b27583b samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala d7c928c7401e539a370d4e82276e7dabbce1b638 samza-shell/src/main/bash/run-config-manager.sh PRE-CREATION samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ea702a919348305ff95ce0b4ca1996a13aff04ec samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala ce88698c12c4bf6f4cf128f92d60b0b9496997d7 settings.gradle 19bff971ad221084dac10d3f7f3facfa42b829a7 Diff: https://reviews.apache.org/r/36006/diff/ Testing --- Tested with hello samza and works properly. Thanks, Shadi A. Noghabi
Re: Missing a change log offset for SystemStreamPartition
Hi Jordi, I think, you need to implement the *InitableTask* interface. Otherwise, the content in the init method will not be processed. Thanks, Fang, Yan yanfang...@gmail.com On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri wrote: > Just for making it easier to reproduce the problem I just reduced the code > of the job to the minimum: > > package test; > > import org.apache.samza.config.Config; > import org.apache.samza.storage.kv.KeyValueStore; > import org.apache.samza.system.IncomingMessageEnvelope; > import org.apache.samza.task.MessageCollector; > import org.apache.samza.task.StreamTask; > import org.apache.samza.task.TaskContext; > import org.apache.samza.task.TaskCoordinator; > > public class testStore implements StreamTask { > private KeyValueStore storestp; > > public void init(Config config, TaskContext context) { > this.storestp = (KeyValueStore) > context.getStore("test11db"); > } > > public void process (IncomingMessageEnvelope envelope, >MessageCollector collector, >TaskCoordinator coordinator) > { > String msgin = (String) envelope.getMessage(); > storestp.put("test1",msgin); > } > } > > The properties file contains this: > > task.class=test.testStore > job.name=test.testStore > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.gz > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181 > > systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093 > > systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909 > > # Declare that we want our job's checkpoints to be written to Kafka > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > task.checkpoint.system=kafka > > # The job consumes a topic called "configtpc" from the "kafka" system > task.inputs=kafka.configtpc > > # Define a serializer/deserializer called "json" which parses JSON messages > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > # Serializer for the system > systems.kafka.samza.msg.serde=string > systems.kafka.streams.tracetpc.samza.msg.serde=json > > # Use the key-value store implementation for a store called "my-store" > > stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory > > # Use the Kafka topic "routingdb-changelog" as the changelog stream for > this store. > # This enables automatic recovery of the store after a failure. If you > don't > # configure this, no changelog stream will be generated. > stores.test11db.changelog=kafka.test11db-changelog > > # Encode keys and values in the store as UTF-8 strings. > stores.test11db.key.serde=string > stores.test11db.msg.serde=string > > # Commit checkpoints every 1 seconds > task.commit.ms=1000 > > With this, I am getting just the same error: > > java version "1.7.0_79" > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) > log4j:WARN No appenders could be found for logger > (org.apache.samza.metrics.JmxServer). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for > more info. > Exception in thread "main" org.apache.samza.SamzaException: Missing a > change log offset for SystemStreamPartition [kafka, test11db-changelog, 2]. > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) > at > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > scala.collection.TraversableLike$WithFilter.for
Re: Missing a change log offset for SystemStreamPartition
Hi, Jordi, Agree with Yan. More specifically, your class definition should be something like: {code} public class testStore implements StreamTask, InitableTask { ... } {code} On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang wrote: > Hi Jordi, > > I think, you need to implement the *InitableTask* interface. Otherwise, the > content in the init method will not be processed. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri > wrote: > > > Just for making it easier to reproduce the problem I just reduced the > code > > of the job to the minimum: > > > > package test; > > > > import org.apache.samza.config.Config; > > import org.apache.samza.storage.kv.KeyValueStore; > > import org.apache.samza.system.IncomingMessageEnvelope; > > import org.apache.samza.task.MessageCollector; > > import org.apache.samza.task.StreamTask; > > import org.apache.samza.task.TaskContext; > > import org.apache.samza.task.TaskCoordinator; > > > > public class testStore implements StreamTask { > > private KeyValueStore storestp; > > > > public void init(Config config, TaskContext context) { > > this.storestp = (KeyValueStore) > > context.getStore("test11db"); > > } > > > > public void process (IncomingMessageEnvelope envelope, > >MessageCollector collector, > >TaskCoordinator coordinator) > > { > > String msgin = (String) envelope.getMessage(); > > storestp.put("test1",msgin); > > } > > } > > > > The properties file contains this: > > > > task.class=test.testStore > > job.name=test.testStore > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.gz > > > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > > systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181 > > > > > systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093 > > > > > systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909 > > > > # Declare that we want our job's checkpoints to be written to Kafka > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > task.checkpoint.system=kafka > > > > # The job consumes a topic called "configtpc" from the "kafka" system > > task.inputs=kafka.configtpc > > > > # Define a serializer/deserializer called "json" which parses JSON > messages > > > > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory > > > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > > > # Serializer for the system > > systems.kafka.samza.msg.serde=string > > systems.kafka.streams.tracetpc.samza.msg.serde=json > > > > # Use the key-value store implementation for a store called "my-store" > > > > > stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory > > > > # Use the Kafka topic "routingdb-changelog" as the changelog stream for > > this store. > > # This enables automatic recovery of the store after a failure. If you > > don't > > # configure this, no changelog stream will be generated. > > stores.test11db.changelog=kafka.test11db-changelog > > > > # Encode keys and values in the store as UTF-8 strings. > > stores.test11db.key.serde=string > > stores.test11db.msg.serde=string > > > > # Commit checkpoints every 1 seconds > > task.commit.ms=1000 > > > > With this, I am getting just the same error: > > > > java version "1.7.0_79" > > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) > > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) > > log4j:WARN No appenders could be found for logger > > (org.apache.samza.metrics.JmxServer). > > log4j:WARN Please initialize the log4j system properly. > > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for > > more info. > > Exception in thread "main" org.apache.samza.SamzaException: Missing a > > change log offset for SystemStreamPartition [kafka, test11db-changelog, > 2]. > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) > > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) > > at > > > org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) > > at > > > scala.collection.TraversableLike$WithFilter$