Hi Jeremiah,

There's currently no way to do that.  I think the best way to modify the
existing ElasticsearchSystemProducer would be to add a config option for a
callback to let you customize this behavior.  Basically, a pluggable
listener (
https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101
).



On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams <jadams...@gmail.com> wrote:

> We have a samza job configured to run in a yarn cluster. This job consumes
> multiple kafka topics and routes the messages to elasticsearch for
> indexing. When enough batch-updates to elasticsearch fail using the
> ElasticsearchSystemProducer, the entire samza job dies. Due to
> checkpointing + yarn, the job starts backup, starts reading where it left
> off and dies again. Enter loop.
>
> Updates to ES are failing due to invalid data on the part of our consumers
> but I can't aways control them so need to be defensive about the code. I
> don't see how to handle this in any of the source examples. I would like to
> just trap this error and if it is what I expect it to be - squash it. Can
> someone point me in the right direction?
>
> Below is the log where the failure occurs.
>
> 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send
> message from TaskName-Partition 5 to system elastic.
> 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught
> exception in thread (name=main). Exiting process now.
> org.apache.samza.SamzaException: Unable to send message from
> TaskName-Partition 5 to system elastic.
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186)
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47)
> at
>
> org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564)
> at
>
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
> at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 ms
> 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms,
> exiting
>
> - jeremiah
>

Reply via email to