Hi,

I am trying to use the Keystore to manage some state information. Basically 
this is the code I am using. As long as I have tested, the rest is working 
correctly.

private KeyValueStore<String, String> storestp;

public void init(Config config, TaskContext context) {
                 this.storestp = (KeyValueStore<String, String>) 
context.getStore("stepdb");
               }

       public void process(IncomingMessageEnvelope envelope,
                    MessageCollector collector,
                    TaskCoordinator coordinator)
                    {
                           …
String str = storestp.get(code)
…
}

When I load it, it goes to running but, whe I send the messages through Kafka 
stream It goes to Failed state. I have found this Exception:
Exception in thread "main" org.apache.samza.SamzaException: Missing a change 
log offset for SystemStreamPartition [kafka, stepdb-changelog, 2].
        at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
        at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:87)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:58)
        at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:87)
        at 
org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:84)
        at 
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:63)
        at 
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
        at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at 
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
        at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

I have seen that the stepdb-changelog stream exists in Kafka. As a try to 
regenerate the missing offset and tes it I have connected through the command 
line and send a message to the stream. It was received correctly. Now I am 
seeing the following Exception:

Exception in thread "main" java.lang.NullPointerException
        at 
scala.collection.mutable.ArrayOps$ofByte$.length$extension(ArrayOps.scala:126)
        at scala.collection.mutable.ArrayOps$ofByte.length(ArrayOps.scala:126)
        at scala.collection.SeqLike$class.size(SeqLike.scala:106)
        at scala.collection.mutable.ArrayOps$ofByte.size(ArrayOps.scala:120)
        at 
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:94)
        at 
org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:79)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:79)
        at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:112)
        at 
org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:106)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:106)
        at 
org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:64)
        at 
org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:88)
        at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at 
org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:607)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at 
org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:607)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:550)
        at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

Is there something wrong?

Thanks,

    Jordi
________________________________
Jordi Blasi Uribarri
Área I+D+i

jbl...@nextel.es
Oficina Bilbao

[http://www.nextel.es/wp-content/uploads/Firma_Nextel_2015.png]

Reply via email to