Hi, I am trying to use the Keystore to manage some state information. Basically this is the code I am using. As long as I have tested, the rest is working correctly.
private KeyValueStore<String, String> storestp; public void init(Config config, TaskContext context) { this.storestp = (KeyValueStore<String, String>) context.getStore("stepdb"); } public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { … String str = storestp.get(code) … } When I load it, it goes to running but, whe I send the messages through Kafka stream It goes to Failed state. I have found this Exception: Exception in thread "main" org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [kafka, stepdb-changelog, 2]. at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87) at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) I have seen that the stepdb-changelog stream exists in Kafka. As a try to regenerate the missing offset and tes it I have connected through the command line and send a message to the stream. It was received correctly. Now I am seeing the following Exception: Exception in thread "main" java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126) at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126) at scala.collection.SeqLike$class.size(SeqLike.scala:106) at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120) at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94) at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:79) at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:112) at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:106) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:106) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:64) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Is there something wrong? Thanks, Jordi ________________________________ Jordi Blasi Uribarri Área I+D+i jbl...@nextel.es Oficina Bilbao [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]