Hi Jagadish,

The 2 methods you provided I had tried already yesterday, then I found using 
the wrong Exception type, since I change to SamzaException it worked.

Thanks for you help and explanation!


> 在 2017年2月23日,23:47,Jagadish Venkatraman <jagadish1...@gmail.com> 写道:
> Hi QiShu,
> 1.  I see the exception occurring in your *process* method. It seems that
> the size of the message you are trying to send is larger than 1M (the
> maximum kafka message size). You can choose to catch the exception in your
> process method and move on. Would n't that work for you?
> {code}
> public DocumentTagTask implements StreamTask {
>  public void process(..,..,..) {
>     //custom logic
>     // try {
>             //logic that could potentially throw an exception
>             //collector.send(msg)
>        } catch(Exception e) {
>             //handle exception and move on.
>       }
>  }
> }
> {code}
> 2. Alternately, you may want to look at the following properties in the Samza
> config table
> <https://samza.apache.org/learn/documentation/0.12/jobs/configuration-table.html>.
> (if you want a config driven approach)
> task.ignored.exceptions This property specifies which exceptions should be
> ignored if thrown in a task's process or window methods. The exceptions to
> be ignored should be a comma-separated list of fully-qualified class names
> of the exceptions or * to ignore all exceptions.
> task.drop.deserialization.errors This property is to define how the system
> deals with deserialization failure situation. If set to true, the system
> will skip the error messages and keep running. If set to false, the system
> with throw exceptions and fail the container. Default is false.
> task.drop.serialization.errors This property is to define how the system
> deals with serialization failure situation. If set to true, the system will
> drop the error messages and keep running. If set to false, the system with
> throw exceptions and fail the container. Default is false.
> Thanks,
> Jagadish
> On Thu, Feb 23, 2017 at 12:27 AM, 舒琦 <sh...@eefung.com> wrote:
>> Hi,
>>        Sometimes there are huge size of data will occur in our flow, like
>> 2MB, now samza will catch exception and shutdown like belowing.But what I
>> want is I can handle such specific exception and just discard such data and
>> the flow continues.
>> 2017-02-23 16:17:01.949 [main] SamzaContainerExceptionHandler [ERROR]
>> Uncaught exception in thread (name=main). Exiting process now.
>> org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable
>> to send message from TaskName-Partition 0 to system kafka.
>>        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:133)
>>        at org.apache.samza.container.SamzaContainer.run(
>> SamzaContainer.scala:661)
>>        at org.apache.samza.container.SamzaContainer$.safeMain(
>> SamzaContainer.scala:115)
>>        at org.apache.samza.container.SamzaContainer$.main(
>> SamzaContainer.scala:89)
>>        at org.apache.samza.container.SamzaContainer.main(
>> SamzaContainer.scala)
>> Caused by: org.apache.samza.SamzaException: Unable to send message from
>> TaskName-Partition 0 to system kafka.
>>        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.
>> onCompletion(KafkaSystemProducer.scala:177)
>>        at org.apache.kafka.clients.producer.KafkaProducer.send(
>> KafkaProducer.java:350)
>>        at org.apache.samza.system.kafka.KafkaSystemProducer.send(
>> KafkaSystemProducer.scala:162)
>>        at org.apache.samza.system.SystemProducers.send(
>> SystemProducers.scala:87)
>>        at org.apache.samza.task.TaskInstanceCollector.send(
>> TaskInstanceCollector.scala:60)
>>        at com.antfact.datacenter.canal.task.tags.DocumentTagTask.
>> process(DocumentTagTask.java:127)
>>        at org.apache.samza.task.AsyncStreamTaskAdapter.process(
>> AsyncStreamTaskAdapter.java:72)
>>        at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(
>> AsyncStreamTaskAdapter.java:63)
>>        at org.apache.samza.container.TaskInstance$$anonfun$process$
>> 1.apply$mcV$sp(TaskInstance.scala:157)
>>        at org.apache.samza.container.TaskInstanceExceptionHandler.
>> maybeHandle(TaskInstanceExceptionHandler.scala:54)
>>        at org.apache.samza.container.TaskInstance.process(
>> TaskInstance.scala:155)
>>        at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
>> process(AsyncRunLoop.java:356)
>>        at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
>> run(AsyncRunLoop.java:325)
>>        at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
>> access$300(AsyncRunLoop.java:283)
>>        at org.apache.samza.task.AsyncRunLoop.runTasks(
>> AsyncRunLoop.java:199)
>>        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:144)
>>        ... 4 more
>> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
>> message is 881729 bytes when serialized which is larger than the maximum
>> request size you have configured with the max.request.size configuration.
>> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down.
>> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down
>> consumer multiplexer.
>> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] Shutting down
>> BrokerProxy for
>> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] closing simple
>> consumer...
>> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
>> for client samza_consumer-canal_doc_tag-1] BrokerProxy
>> [INFO] Got interrupt exception in broker proxy thread.
>> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] Shutting down
>> BrokerProxy for
>> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] closing simple
>> consumer...
>> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
>> for client samza_consumer-canal_doc_tag-1] BrokerProxy
>> [INFO] Got interrupt exception in broker proxy thread.
>> 2017-02-23 16:17:01.941 [main] SamzaContainer [INFO] Shutting down task
>> instance stream tasks.
>> 2017-02-23 16:17:01.942 [main] SamzaContainer [INFO] Shutting down task
>> instance stores.
>> 2017-02-23 16:17:01.943 [main] SamzaContainer [INFO] Shutting down host
>> statistics monitor.
>> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down
>> producer multiplexer.
>> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down
>> locality manager.
>> 2017-02-23 16:17:01.944 [main] CoordinatorStreamSystemProducer [INFO]
>> Stopping coordinator stream producer.
>> 2017-02-23 16:17:01.945 [main] SamzaContainer [INFO] Shutting down offset
>> manager.
>> 2017-02-23 16:17:01.946 [main] SamzaContainer [INFO] Shutting down metrics
>> reporters.
>> 2017-02-23 16:17:01.946 [main] MetricsSnapshotReporter [INFO] Stopping
>> producer.
>> 2017-02-23 16:17:01.947 [main] MetricsSnapshotReporter [INFO] Stopping
>> reporter timer.
>> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutting down JVM
>> metrics.
>> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutdown complete.
>> Thanks!
>> ————————
>> QiShu
> -- 
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University

Reply via email to