Hi,

        Sometimes there are huge size of data will occur in our flow, like 2MB, 
now samza will catch exception and shutdown like belowing.But what I want is I 
can handle such specific exception and just discard such data and the flow 
continues.

2017-02-23 16:17:01.949 [main] SamzaContainerExceptionHandler [ERROR] Uncaught 
exception in thread (name=main). Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to 
send message from TaskName-Partition 0 to system kafka.
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:133)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:661)
        at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:115)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:89)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Caused by: org.apache.samza.SamzaException: Unable to send message from 
TaskName-Partition 0 to system kafka.
        at 
org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:177)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:350)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:162)
        at 
org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87)
        at 
org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:60)
        at 
com.antfact.datacenter.canal.task.tags.DocumentTagTask.process(DocumentTagTask.java:127)
        at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
        at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
        at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:157)
        at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
        at 
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:155)
        at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:356)
        at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:325)
        at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:283)
        at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:199)
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:144)
        ... 4 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message 
is 881729 bytes when serialized which is larger than the maximum request size 
you have configured with the max.request.size configuration.
2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down.
2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down consumer 
multiplexer.
2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] Shutting down BrokerProxy for 
172.19.105.20:9096
2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] closing simple consumer...
2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
172.19.105.20:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy 
[INFO] Got interrupt exception in broker proxy thread.
2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] Shutting down BrokerProxy for 
172.19.105.22:9096
2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] closing simple consumer...
2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
172.19.105.22:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy 
[INFO] Got interrupt exception in broker proxy thread.
2017-02-23 16:17:01.941 [main] SamzaContainer [INFO] Shutting down task 
instance stream tasks.
2017-02-23 16:17:01.942 [main] SamzaContainer [INFO] Shutting down task 
instance stores.
2017-02-23 16:17:01.943 [main] SamzaContainer [INFO] Shutting down host 
statistics monitor.
2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down producer 
multiplexer.
2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down locality 
manager.
2017-02-23 16:17:01.944 [main] CoordinatorStreamSystemProducer [INFO] Stopping 
coordinator stream producer.
2017-02-23 16:17:01.945 [main] SamzaContainer [INFO] Shutting down offset 
manager.
2017-02-23 16:17:01.946 [main] SamzaContainer [INFO] Shutting down metrics 
reporters.
2017-02-23 16:17:01.946 [main] MetricsSnapshotReporter [INFO] Stopping producer.
2017-02-23 16:17:01.947 [main] MetricsSnapshotReporter [INFO] Stopping reporter 
timer.
2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutting down JVM metrics.
2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutdown complete.

Thanks!

————————
QiShu

Reply via email to