Hi Jeremiah, There's currently no way to do that. I think the best way to modify the existing ElasticsearchSystemProducer would be to add a config option for a callback to let you customize this behavior. Basically, a pluggable listener ( https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101 ).
On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams <[email protected]> wrote: > We have a samza job configured to run in a yarn cluster. This job consumes > multiple kafka topics and routes the messages to elasticsearch for > indexing. When enough batch-updates to elasticsearch fail using the > ElasticsearchSystemProducer, the entire samza job dies. Due to > checkpointing + yarn, the job starts backup, starts reading where it left > off and dies again. Enter loop. > > Updates to ES are failing due to invalid data on the part of our consumers > but I can't aways control them so need to be defensive about the code. I > don't see how to handle this in any of the source examples. I would like to > just trap this error and if it is what I expect it to be - squash it. Can > someone point me in the right direction? > > Below is the log where the failure occurs. > > 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send > message from TaskName-Partition 5 to system elastic. > 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught > exception in thread (name=main). Exiting process now. > org.apache.samza.SamzaException: Unable to send message from > TaskName-Partition 5 to system elastic. > at > > org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186) > at > > org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92) > at > > org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) > at > > org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47) > at > > org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672) > at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564) > at > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92) > at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66) > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 ms > 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms, > exiting > > - jeremiah >
