Hi,
I am trying to use the Keystore to manage some state information. Basically
this is the code I am using. As long as I have tested, the rest is working
correctly.
private KeyValueStore<String, String> storestp;
public void init(Config config, TaskContext context) {
this.storestp = (KeyValueStore<String, String>)
context.getStore("stepdb");
}
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator)
{
…
String str = storestp.get(code)
…
}
When I load it, it goes to running but, whe I send the messages through Kafka
stream It goes to Failed state. I have found this Exception:
Exception in thread "main" org.apache.samza.SamzaException: Missing a change
log offset for SystemStreamPartition [kafka, stepdb-changelog, 2].
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
at
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
at
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
at
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
I have seen that the stepdb-changelog stream exists in Kafka. As a try to
regenerate the missing offset and tes it I have connected through the command
line and send a message to the stream. It was received correctly. Now I am
seeing the following Exception:
Exception in thread "main" java.lang.NullPointerException
at
scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
at scala.collection.SeqLike$class.size(SeqLike.scala:106)
at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
at
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94)
at
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:79)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:112)
at
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:106)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:106)
at
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:64)
at
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
at
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Is there something wrong?
Thanks,
Jordi
________________________________
Jordi Blasi Uribarri
Área I+D+i
[email protected]
Oficina Bilbao
[http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]