Just for making it easier to reproduce the problem I just reduced the code of
the job to the minimum:
package test;
import org.apache.samza.config.Config;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
public class testStore implements StreamTask {
private KeyValueStore<String, String> storestp;
public void init(Config config, TaskContext context) {
this.storestp = (KeyValueStore<String, String>)
context.getStore("test11db");
}
public void process (IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator)
{
String msgin = (String) envelope.getMessage();
storestp.put("test1",msgin);
}
}
The properties file contains this:
task.class=test.testStore
job.name=test.testStore
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
yarn.package.path=http://192.168.15.92/jobs/nxtBroker-0.0.1-bin.tar.gz
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=kfk-kafka01:2181,kfk-kafka02:2181
systems.kafka.producer.bootstrap.servers=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:9093
systems.kafka.producer.metadata.broker.list=kfk-kafka01:9092,kfk-kafka01:9093,kfk-kafka02:9092,kfk-kafka02:909
# Declare that we want our job's checkpoints to be written to Kafka
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# The job consumes a topic called "configtpc" from the "kafka" system
task.inputs=kafka.configtpc
# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
# Serializer for the system
systems.kafka.samza.msg.serde=string
systems.kafka.streams.tracetpc.samza.msg.serde=json
# Use the key-value store implementation for a store called "my-store"
stores.test11db.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
# Use the Kafka topic "routingdb-changelog" as the changelog stream for this
store.
# This enables automatic recovery of the store after a failure. If you don't
# configure this, no changelog stream will be generated.
stores.test11db.changelog=kafka.test11db-changelog
# Encode keys and values in the store as UTF-8 strings.
stores.test11db.key.serde=string
stores.test11db.msg.serde=string
# Commit checkpoints every 1 seconds
task.commit.ms=1000
With this, I am getting just the same error:
java version "1.7.0_79"
OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1)
OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
log4j:WARN No appenders could be found for logger
(org.apache.samza.metrics.JmxServer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more
info.
Exception in thread "main" org.apache.samza.SamzaException: Missing a change
log offset for SystemStreamPartition [kafka, test11db-changelog, 2].
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
at
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
at
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
at
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
The job fails even when there is no message sent to the input topic.
Samza is version 0.9.1 and kafka 0.8.2.
Thanks,
Jordi
-----Mensaje original-----
De: Jordi Blasi Uribarri [mailto:[email protected]]
Enviado el: lunes, 10 de agosto de 2015 10:26
Para: [email protected]
Asunto: RE: Missing a change log offset for SystemStreamPartition
Hi,
I have migrated samza to the last versión and recreated the job with a new
store name so the streams were created clean. I am getting the same error:
java version "1.7.0_79"
OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-1~deb7u1) OpenJDK
64-Bit Server VM (build 24.79-b02, mixed mode) log4j:WARN No appenders could be
found for logger (org.apache.samza.metrics.JmxServer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more
info.
Exception in thread "main" org.apache.samza.SamzaException: Missing a change
log offset for SystemStreamPartition [kafka, commdb-changelog, 2].
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
at
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
at
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
at
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Is there any other info I can attach to help find the problem?
Thanks,
Jordi
-----Mensaje original-----
De: Yan Fang [mailto:[email protected]] Enviado el: viernes, 07 de agosto de
2015 23:21
Para: [email protected]
Asunto: Re: Missing a change log offset for SystemStreamPartition
Hi Jordi,
Sorry for getting you back late. Was quite busy yesterday.
I think the reason of your error is that you mismatched Samza version and Kafka
version.
Seems that, you are using Samza 0.8.1 with Kafka 0.8.2, which is not supported.
So my suggestion is to upgrade to *Samza 0.9.1*, then use *Kafka 0.8.2*.
This match is proved working.
Hope this helps you.
Thanks,
Fang, Yan
[email protected]
On Thu, Aug 6, 2015 at 1:16 AM, Jordi Blasi Uribarri <[email protected]>
wrote:
> I changed the job name and the store name. I was defining two
> different stores and in case that was the problem, I also eliminated the
> second one.
> I am getting the same exception.
>
> Exception in thread "main" org.apache.samza.SamzaException: Missing a
> change log offset for SystemStreamPartition [kafka, testdb-changelog, 2].
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
> at
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
> at
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
> at
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
> at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
> As I have the autocreate configured in Kafka I am not creating
> anything for the store. Is that ok?
>
> By the way, is there any problem on having two different stores?
>
> Thanks,
>
> Jordi
>
> -----Mensaje original-----
> De: Yan Fang [mailto:[email protected]] Enviado el: miércoles, 05
> de agosto de 2015 20:23
> Para: [email protected]
> Asunto: Re: Missing a change log offset for SystemStreamPartition
>
> Hi Jordi,
>
> I wonder, the reason of your first exception is that, you changed the
> task number (partition number of your input stream), but still were
> using the same changelog stream. It is trying to send to the partition
> 2, which does not exist?
>
> Can you reproduce this exception in a new job? (new store name, new
> job
> name)
>
> The second exception is caused by the wrong offset format, I believe.
>
> Let me know how the new job goes.
>
> Thanks,
>
> Fang, Yan
> [email protected]
>
> On Wed, Aug 5, 2015 at 12:51 AM, Jordi Blasi Uribarri
> <[email protected]>
> wrote:
>
> > Hi,
> >
> > I am trying to use the Keystore to manage some state information.
> > Basically this is the code I am using. As long as I have tested, the
> > rest is working correctly.
> >
> > private KeyValueStore<String, String> storestp;
> >
> > public void init(Config config, TaskContext context) {
> > this.storestp = (KeyValueStore<String, String>)
> > context.getStore("stepdb");
> > }
> >
> > public void process(IncomingMessageEnvelope envelope,
> > MessageCollector collector,
> > TaskCoordinator coordinator)
> > {
> > …
> > String str = storestp.get(code)
> > …
> > }
> >
> > When I load it, it goes to running but, whe I send the messages
> > through Kafka stream It goes to Failed state. I have found this
> Exception:
> > Exception in thread "main" org.apache.samza.SamzaException: Missing
> > a change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2].
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> $anonfun$1.apply(TaskStorageManager.scala:87)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$
> $anonfun$1.apply(TaskStorageManager.scala:87)
> > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> > at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> apply(TaskStorageManager.scala:87)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.
> apply(TaskStorageManager.scala:84)
> > at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> > at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> .scala:245)
> > at
> >
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike
> .scala:245)
> > at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> > at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
> > at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> > at
> scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> > at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> > at
> >
> org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorage
> Manager.scala:84)
> > at
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc
> ala:63)
> > at
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala
> :88)
> > at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> > at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > at
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s
> cala:607)
> > at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> > at
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> la:108)
> > at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> > at
> > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >
> > I have seen that the stepdb-changelog stream exists in Kafka. As a
> > try to regenerate the missing offset and tes it I have connected
> > through the command line and send a message to the stream. It was
> > received
> correctly.
> > Now I am seeing the following Exception:
> >
> > Exception in thread "main" java.lang.NullPointerException
> > at
> >
> scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.sc
> ala:126)
> > at
> > scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
> > at scala.collection.SeqLike$class.size(SeqLike.scala:106)
> > at
> > scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
> > at
> >
> org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.a
> pply(KeyValueStorageEngine.scala:94)
> > at
> >
> org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.a
> pply(KeyValueStorageEngine.scala:79)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> >
> org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStor
> ageEngine.scala:79)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.a
> pply(TaskStorageManager.scala:112)
> > at
> >
> org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.a
> pply(TaskStorageManager.scala:106)
> > at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(T
> raversableLike.scala:772)
> > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.sc
> ala:771)
> > at
> >
> org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageM
> anager.scala:106)
> > at
> >
> org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.sc
> ala:64)
> > at
> >
> org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala
> :88)
> > at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> > at
> >
> org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply
> (SamzaContainer.scala:607)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > at
> >
> org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.s
> cala:607)
> > at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
> > at
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> la:108)
> > at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> > at
> > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >
> > Is there something wrong?
> >
> > Thanks,
> >
> > Jordi
> > ________________________________
> > Jordi Blasi Uribarri
> > Área I+D+i
> >
> > [email protected]
> > Oficina Bilbao
> >
> > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]
> >
>