Hi Jagadish, The 2 methods you provided I had tried already yesterday, then I found using the wrong Exception type, since I change to SamzaException it worked.
Thanks for you help and explanation! ———————— QiShu > 在 2017年2月23日,23:47,Jagadish Venkatraman <jagadish1...@gmail.com> 写道: > > Hi QiShu, > > 1. I see the exception occurring in your *process* method. It seems that > the size of the message you are trying to send is larger than 1M (the > maximum kafka message size). You can choose to catch the exception in your > process method and move on. Would n't that work for you? > > {code} > > public DocumentTagTask implements StreamTask { > public void process(..,..,..) { > //custom logic > // try { > //logic that could potentially throw an exception > //collector.send(msg) > } catch(Exception e) { > //handle exception and move on. > } > } > } > > {code} > > 2. Alternately, you may want to look at the following properties in the Samza > config table > <https://samza.apache.org/learn/documentation/0.12/jobs/configuration-table.html>. > (if you want a config driven approach) > > task.ignored.exceptions This property specifies which exceptions should be > ignored if thrown in a task's process or window methods. The exceptions to > be ignored should be a comma-separated list of fully-qualified class names > of the exceptions or * to ignore all exceptions. > task.drop.deserialization.errors This property is to define how the system > deals with deserialization failure situation. If set to true, the system > will skip the error messages and keep running. If set to false, the system > with throw exceptions and fail the container. Default is false. > task.drop.serialization.errors This property is to define how the system > deals with serialization failure situation. If set to true, the system will > drop the error messages and keep running. If set to false, the system with > throw exceptions and fail the container. Default is false. > > Thanks, > Jagadish > > > > > > > > > > On Thu, Feb 23, 2017 at 12:27 AM, 舒琦 <sh...@eefung.com> wrote: > >> Hi, >> >> Sometimes there are huge size of data will occur in our flow, like >> 2MB, now samza will catch exception and shutdown like belowing.But what I >> want is I can handle such specific exception and just discard such data and >> the flow continues. >> >> 2017-02-23 16:17:01.949 [main] SamzaContainerExceptionHandler [ERROR] >> Uncaught exception in thread (name=main). Exiting process now. >> org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable >> to send message from TaskName-Partition 0 to system kafka. >> at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:133) >> at org.apache.samza.container.SamzaContainer.run( >> SamzaContainer.scala:661) >> at org.apache.samza.container.SamzaContainer$.safeMain( >> SamzaContainer.scala:115) >> at org.apache.samza.container.SamzaContainer$.main( >> SamzaContainer.scala:89) >> at org.apache.samza.container.SamzaContainer.main( >> SamzaContainer.scala) >> Caused by: org.apache.samza.SamzaException: Unable to send message from >> TaskName-Partition 0 to system kafka. >> at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1. >> onCompletion(KafkaSystemProducer.scala:177) >> at org.apache.kafka.clients.producer.KafkaProducer.send( >> KafkaProducer.java:350) >> at org.apache.samza.system.kafka.KafkaSystemProducer.send( >> KafkaSystemProducer.scala:162) >> at org.apache.samza.system.SystemProducers.send( >> SystemProducers.scala:87) >> at org.apache.samza.task.TaskInstanceCollector.send( >> TaskInstanceCollector.scala:60) >> at com.antfact.datacenter.canal.task.tags.DocumentTagTask. >> process(DocumentTagTask.java:127) >> at org.apache.samza.task.AsyncStreamTaskAdapter.process( >> AsyncStreamTaskAdapter.java:72) >> at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync( >> AsyncStreamTaskAdapter.java:63) >> at org.apache.samza.container.TaskInstance$$anonfun$process$ >> 1.apply$mcV$sp(TaskInstance.scala:157) >> at org.apache.samza.container.TaskInstanceExceptionHandler. >> maybeHandle(TaskInstanceExceptionHandler.scala:54) >> at org.apache.samza.container.TaskInstance.process( >> TaskInstance.scala:155) >> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker. >> process(AsyncRunLoop.java:356) >> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker. >> run(AsyncRunLoop.java:325) >> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker. >> access$300(AsyncRunLoop.java:283) >> at org.apache.samza.task.AsyncRunLoop.runTasks( >> AsyncRunLoop.java:199) >> at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:144) >> ... 4 more >> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The >> message is 881729 bytes when serialized which is larger than the maximum >> request size you have configured with the max.request.size configuration. >> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down. >> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down >> consumer multiplexer. >> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] Shutting down >> BrokerProxy for 172.19.105.20:9096 >> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] closing simple >> consumer... >> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at >> 172.19.105.20:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy >> [INFO] Got interrupt exception in broker proxy thread. >> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] Shutting down >> BrokerProxy for 172.19.105.22:9096 >> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] closing simple >> consumer... >> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at >> 172.19.105.22:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy >> [INFO] Got interrupt exception in broker proxy thread. >> 2017-02-23 16:17:01.941 [main] SamzaContainer [INFO] Shutting down task >> instance stream tasks. >> 2017-02-23 16:17:01.942 [main] SamzaContainer [INFO] Shutting down task >> instance stores. >> 2017-02-23 16:17:01.943 [main] SamzaContainer [INFO] Shutting down host >> statistics monitor. >> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down >> producer multiplexer. >> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down >> locality manager. >> 2017-02-23 16:17:01.944 [main] CoordinatorStreamSystemProducer [INFO] >> Stopping coordinator stream producer. >> 2017-02-23 16:17:01.945 [main] SamzaContainer [INFO] Shutting down offset >> manager. >> 2017-02-23 16:17:01.946 [main] SamzaContainer [INFO] Shutting down metrics >> reporters. >> 2017-02-23 16:17:01.946 [main] MetricsSnapshotReporter [INFO] Stopping >> producer. >> 2017-02-23 16:17:01.947 [main] MetricsSnapshotReporter [INFO] Stopping >> reporter timer. >> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutting down JVM >> metrics. >> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutdown complete. >> >> Thanks! >> >> ———————— >> QiShu >> >> > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University