Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-08-10 Thread Robert Zuljevic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/
---

(Updated Aug. 10, 2015, 7:47 a.m.)


Review request for samza.


Repository: samza


Description (updated)
---

Rebased branch on latest master branch.


Diffs (updated)
-

  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
7a588ebc99b5f07d533e48e10061a3075a63665a 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae3a904716ea51a2b27c7701ac30d13b854 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
ea2eaa5f663e717414b8a4b42123842105df150d 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
c39cdc7ed86b591f379eda24005df25928bd8f16 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c4639fc226b34e64915935c1d90e5917af2e 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
4097ac7fa1f2b70c13bf8b618808db1f688a99e6 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
 1fd5dd34f70cb701b4c134b27464863d445b35f3 
  samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
f6d53e084771cfbfd1d1ca68f1ac54db0bd91067 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f54f526d5d88ad3bc312b71fce40260e7c6 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366f0f60e401765a000fa265c59dee4a461e 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 

Diff: https://reviews.apache.org/r/36163/diff/


Testing
---

I wasn't really sure what kind of test (unit test / integration test) I should 
make here, so any pointers would be greatly appreaciated! I tested the change 
with the unit/integration tests already available.


Thanks,

Robert Zuljevic



Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

2015-08-10 Thread Robert Zuljevic

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/
---

(Updated Aug. 10, 2015, 7:50 a.m.)


Review request for samza.


Repository: samza


Description (updated)
---

Removed unnecessary whitespaces.


Diffs (updated)
-

  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 
7a588ebc99b5f07d533e48e10061a3075a63665a 
  
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
 249b8ae3a904716ea51a2b27c7701ac30d13b854 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
ea2eaa5f663e717414b8a4b42123842105df150d 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
c39cdc7ed86b591f379eda24005df25928bd8f16 
  
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
8d54c4639fc226b34e64915935c1d90e5917af2e 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
4097ac7fa1f2b70c13bf8b618808db1f688a99e6 
  
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
 1fd5dd34f70cb701b4c134b27464863d445b35f3 
  samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
f6d53e084771cfbfd1d1ca68f1ac54db0bd91067 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
35086f54f526d5d88ad3bc312b71fce40260e7c6 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b063366f0f60e401765a000fa265c59dee4a461e 
  
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 

Diff: https://reviews.apache.org/r/36163/diff/


Testing
---

I wasn't really sure what kind of test (unit test / integration test) I should 
make here, so any pointers would be greatly appreaciated! I tested the change 
with the unit/integration tests already available.


Thanks,

Robert Zuljevic



RE: Missing a change log offset for SystemStreamPartition

2015-08-10 Thread Jordi Blasi Uribarri
Hi,

I have migrated samza to the last versiĆ³n and recreated the job with a new 
store name so the streams were created clean. I am getting the same error:

java version "1.7.0_79"
OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1)
OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
log4j:WARN No appenders could be found for logger 
(org.apache.samza.metrics.JmxServer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.samza.SamzaException: Missing a change 
log offset for SystemStreamPartition [kafka, commdb-changelog, 2].
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at 
org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
at 
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
at 
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at 
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

Is there any other info I can attach to help find the problem?

Thanks,

  Jordi

-Mensaje original-
De: Yan Fang [mailto:yanfang...@gmail.com] 
Enviado el: viernes, 07 de agosto de 2015 23:21
Para: dev@samza.apache.org
Asunto: Re: Missing a change log offset for SystemStreamPartition

Hi Jordi,

Sorry for getting you back late. Was quite busy yesterday.

I think the reason of your error is that you mismatched Samza version and Kafka 
version.

Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not supported.

So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*.
This match is proved working.

Hope this helps you.

Thanks,


Fang, Yan
yanfang...@gmail.com

On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri 
wrote:

> I changed the job name and the store name. I was defining two 
> different stores and in case that was the problem, I also eliminated the 
> second one.
> I am getting the same exception.
>
> Exception in thread "main" org.apache.samza.SamzaException: Missing a 
> change log offset for SystemStreamPartition [kafka, testdb-changelog, 2].
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> at
> scala.collection.TraversableLike$WithFilte

RE: Missing a change log offset for SystemStreamPartition

2015-08-10 Thread Jordi Blasi Uribarri
Just for making it easier to reproduce the problem I just reduced the code of 
the job to the minimum:

package test;

import org.apache.samza.config.Config;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;

public class testStore implements StreamTask {
private KeyValueStore storestp;

 public void init(Config config, TaskContext context) {
this.storestp = (KeyValueStore) 
context.getStore("test11db");
  }
 
public void process (IncomingMessageEnvelope envelope,
   MessageCollector collector,
   TaskCoordinator coordinator)
{
String msgin = (String) envelope.getMessage();
storestp.put("test1",msgin);
}
}

The properties file contains this:

task.class=test.testStore
job.name=test.testStore
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.gz

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181
systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093
systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909

# Declare that we want our job's checkpoints to be written to Kafka
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka

# The job consumes a topic called "configtpc" from the "kafka" system
task.inputs=kafka.configtpc

# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Serializer for the system
systems.kafka.samza.msg.serde=string
systems.kafka.streams.tracetpc.samza.msg.serde=json

# Use the key-value store implementation for a store called "my-store"
stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory

# Use the Kafka topic "routingdb-changelog" as the changelog stream for this 
store.
# This enables automatic recovery of the store after a failure. If you don't
# configure this, no changelog stream will be generated.
stores.test11db.changelog=kafka.test11db-changelog

# Encode keys and values in the store as UTF-8 strings.
stores.test11db.key.serde=string
stores.test11db.msg.serde=string

# Commit checkpoints every 1 seconds
task.commit.ms=1000

With this, I am getting just the same error:

java version "1.7.0_79"
OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1)
OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
log4j:WARN No appenders could be found for logger 
(org.apache.samza.metrics.JmxServer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.samza.SamzaException: Missing a change 
log offset for SystemStreamPartition [kafka, test11db-changelog, 2].
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at 
org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
at 
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
at 
org.apache.samza.co

Re: Review Request 36006: SAMZA-724: Writing a tool to read from the coordinator stream and react to config changes accordingly.

2015-08-10 Thread Shadi A. Noghabi

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36006/
---

(Updated Aug. 10, 2015, 6:35 p.m.)


Review request for samza, Yi Pan (Data Infrastructure), Navina Ramesh, and 
Naveen Somasundaram.


Repository: samza


Description
---

This RBis for SAMZA-724.

After a job is submitted, it might need some configuration change, specifically 
it might need more containers. In SAMZA-704 a tool is being added to write to 
the coordinator stream (CoordinatorStreamWriter).  This tool can be used to 
write new configurations to the coordinator stream. However, another tool 
(ConfigManager) is needed to read the config changes and react to them, which 
is the goal of this task. This tool should be brought up after the job is 
submitted and read any config changes added to the coordinator stream, and 
react to each accordingly. 

This tool, called the Config Manager, is focusing on handling container changs 
by reacting to set-config massages with key "yarn.container.count". 

The config manager is a separate standa alone module, that should be brought up 
separately after the submission of a job. Therefore, you have to add two 
configurations to the input config file:
1. yarn.rm.address= 
2. yarn.rm.port= 

The config manger will periodically poll the coordinator stream to see if there 
are any new messages. This period is set to 100 ms by deafualt. However, it can 
be configured by adding configManager.polling.interval= to 
the input config file. Thus, overal the command to run the config manager along 
with the job would be:


/bin/run-config-manager.sh --config-factory= --config-path=


Diffs (updated)
-

  build.gradle 0852adc4e8e0c2816afd1ebf433f1af6b44852f7 
  checkstyle/import-control.xml 24ed680785175f3cdf955602b7a813684edd813e 
  gradle/dependency-versions.gradle fb06e8ed393d1a38abfa1a48fe5244fc7f6c7339 
  
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
 PRE-CREATION 
  
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 2277a732b9ab763edf19a0fbec288ff72b27583b 
  samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
d7c928c7401e539a370d4e82276e7dabbce1b638 
  samza-shell/src/main/bash/run-config-manager.sh PRE-CREATION 
  
samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 ea702a919348305ff95ce0b4ca1996a13aff04ec 
  
samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala 
ce88698c12c4bf6f4cf128f92d60b0b9496997d7 
  settings.gradle 19bff971ad221084dac10d3f7f3facfa42b829a7 

Diff: https://reviews.apache.org/r/36006/diff/


Testing
---

Tested with hello samza and works properly.


Thanks,

Shadi A. Noghabi



Re: Missing a change log offset for SystemStreamPartition

2015-08-10 Thread Yan Fang
Hi Jordi,

I think, you need to implement the *InitableTask* interface. Otherwise, the
content in the init method will not be processed.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri 
wrote:

> Just for making it easier to reproduce the problem I just reduced the code
> of the job to the minimum:
>
> package test;
>
> import org.apache.samza.config.Config;
> import org.apache.samza.storage.kv.KeyValueStore;
> import org.apache.samza.system.IncomingMessageEnvelope;
> import org.apache.samza.task.MessageCollector;
> import org.apache.samza.task.StreamTask;
> import org.apache.samza.task.TaskContext;
> import org.apache.samza.task.TaskCoordinator;
>
> public class testStore implements StreamTask {
> private KeyValueStore storestp;
>
>  public void init(Config config, TaskContext context) {
> this.storestp = (KeyValueStore)
> context.getStore("test11db");
>   }
>
> public void process (IncomingMessageEnvelope envelope,
>MessageCollector collector,
>TaskCoordinator coordinator)
> {
> String msgin = (String) envelope.getMessage();
> storestp.put("test1",msgin);
> }
> }
>
> The properties file contains this:
>
> task.class=test.testStore
> job.name=test.testStore
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.gz
>
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181
>
> systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093
>
> systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909
>
> # Declare that we want our job's checkpoints to be written to Kafka
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> task.checkpoint.system=kafka
>
> # The job consumes a topic called "configtpc" from the "kafka" system
> task.inputs=kafka.configtpc
>
> # Define a serializer/deserializer called "json" which parses JSON messages
>
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Serializer for the system
> systems.kafka.samza.msg.serde=string
> systems.kafka.streams.tracetpc.samza.msg.serde=json
>
> # Use the key-value store implementation for a store called "my-store"
>
> stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
>
> # Use the Kafka topic "routingdb-changelog" as the changelog stream for
> this store.
> # This enables automatic recovery of the store after a failure. If you
> don't
> # configure this, no changelog stream will be generated.
> stores.test11db.changelog=kafka.test11db-changelog
>
> # Encode keys and values in the store as UTF-8 strings.
> stores.test11db.key.serde=string
> stores.test11db.msg.serde=string
>
> # Commit checkpoints every 1 seconds
> task.commit.ms=1000
>
> With this, I am getting just the same error:
>
> java version "1.7.0_79"
> OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1)
> OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
> log4j:WARN No appenders could be found for logger
> (org.apache.samza.metrics.JmxServer).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> Exception in thread "main" org.apache.samza.SamzaException: Missing a
> change log offset for SystemStreamPartition [kafka, test11db-changelog, 2].
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at
> scala.collection.TraversableLike$WithFilter.for

Re: Missing a change log offset for SystemStreamPartition

2015-08-10 Thread Yi Pan
Hi, Jordi,

Agree with Yan. More specifically, your class definition should be
something like:
{code}
public class testStore implements StreamTask, InitableTask {
...
}
{code}

On Mon, Aug 10, 2015 at 6:08 PM, Yan Fang  wrote:

> Hi Jordi,
>
> I think, you need to implement the *InitableTask* interface. Otherwise, the
> content in the init method will not be processed.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Mon, Aug 10, 2015 at 3:24 AM, Jordi Blasi Uribarri 
> wrote:
>
> > Just for making it easier to reproduce the problem I just reduced the
> code
> > of the job to the minimum:
> >
> > package test;
> >
> > import org.apache.samza.config.Config;
> > import org.apache.samza.storage.kv.KeyValueStore;
> > import org.apache.samza.system.IncomingMessageEnvelope;
> > import org.apache.samza.task.MessageCollector;
> > import org.apache.samza.task.StreamTask;
> > import org.apache.samza.task.TaskContext;
> > import org.apache.samza.task.TaskCoordinator;
> >
> > public class testStore implements StreamTask {
> > private KeyValueStore storestp;
> >
> >  public void init(Config config, TaskContext context) {
> > this.storestp = (KeyValueStore)
> > context.getStore("test11db");
> >   }
> >
> > public void process (IncomingMessageEnvelope envelope,
> >MessageCollector collector,
> >TaskCoordinator coordinator)
> > {
> > String msgin = (String) envelope.getMessage();
> > storestp.put("test1",msgin);
> > }
> > }
> >
> > The properties file contains this:
> >
> > task.class=test.testStore
> > job.name=test.testStore
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.gz
> >
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >
> systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181
> >
> >
> systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093
> >
> >
> systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909
> >
> > # Declare that we want our job's checkpoints to be written to Kafka
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > task.checkpoint.system=kafka
> >
> > # The job consumes a topic called "configtpc" from the "kafka" system
> > task.inputs=kafka.configtpc
> >
> > # Define a serializer/deserializer called "json" which parses JSON
> messages
> >
> >
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> > # Serializer for the system
> > systems.kafka.samza.msg.serde=string
> > systems.kafka.streams.tracetpc.samza.msg.serde=json
> >
> > # Use the key-value store implementation for a store called "my-store"
> >
> >
> stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
> >
> > # Use the Kafka topic "routingdb-changelog" as the changelog stream for
> > this store.
> > # This enables automatic recovery of the store after a failure. If you
> > don't
> > # configure this, no changelog stream will be generated.
> > stores.test11db.changelog=kafka.test11db-changelog
> >
> > # Encode keys and values in the store as UTF-8 strings.
> > stores.test11db.key.serde=string
> > stores.test11db.msg.serde=string
> >
> > # Commit checkpoints every 1 seconds
> > task.commit.ms=1000
> >
> > With this, I am getting just the same error:
> >
> > java version "1.7.0_79"
> > OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1)
> > OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
> > log4j:WARN No appenders could be found for logger
> > (org.apache.samza.metrics.JmxServer).
> > log4j:WARN Please initialize the log4j system properly.
> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> > more info.
> > Exception in thread "main" org.apache.samza.SamzaException: Missing a
> > change log offset for SystemStreamPartition [kafka, test11db-changelog,
> 2].
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> > at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> > at
> >
> scala.collection.TraversableLike$WithFilter$