I have attached the full framework logs. It's basically the same stack trace
a few times.
19:24:48.263 [Samza StreamProcessor Container Thread-0] ERROR 
org.apache.samza.task.AsyncRunLoop - Got callback failure for task Partition 0
org.apache.samza.SamzaException: Callback failed for task Partition 0, ssp 
SystemStreamPartition [kafka, audit-events, 0], offset 682227.
    at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    ... 15 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    ... 15 more
19:24:48.277 [Samza StreamProcessor Container Thread-0] ERROR 
org.apache.samza.task.AsyncRunLoop - Caught throwable and stopping run loop
org.apache.samza.SamzaException: Callback failed for task Partition 0, ssp 
SystemStreamPartition [kafka, audit-events, 0], offset 682227.
    at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    ... 15 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    ... 15 more
19:24:48.277 [Samza StreamProcessor Container Thread-0] ERROR 
org.apache.samza.task.AsyncRunLoop - Caught throwable and stopping run loop
org.apache.samza.SamzaException: Callback failed for task Partition 0, ssp 
SystemStreamPartition [kafka, audit-events, 0], offset 682227.
    at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    ... 15 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    ... 15 more
19:24:48.283 [Samza StreamProcessor Container Thread-0] ERROR 
org.apache.samza.container.SamzaContainer - Caught exception/error in run loop.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Callback 
failed for task Partition 0, ssp SystemStreamPartition [kafka, audit-events, 
0], offset 682227.
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.samza.SamzaException: Callback failed for task Partition 
0, ssp SystemStreamPartition [kafka, audit-events, 0], offset 682227.
    at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
19:24:48.283 [Samza StreamProcessor Container Thread-0] ERROR 
org.apache.samza.container.SamzaContainer - Caught exception/error in run loop.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Callback 
failed for task Partition 0, ssp SystemStreamPartition [kafka, audit-events, 
0], offset 682227.
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.samza.SamzaException: Callback failed for task Partition 
0, ssp SystemStreamPartition [kafka, audit-events, 0], offset 682227.
    at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
19:24:49.326 [Samza StreamProcessor Container Thread-0] ERROR 
org.apache.samza.processor.StreamProcessor - Container: 
org.apache.samza.container.SamzaContainer@3e923d9e failed with an exception. 
Stopping the stream processor: c13057a8-42c5-4b68-9f73-29138fc6eb89. Original 
exception:
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Callback 
failed for task Partition 0, ssp SystemStreamPartition [kafka, audit-events, 
0], offset 682227.
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.samza.SamzaException: Callback failed for task Partition 
0, ssp SystemStreamPartition [kafka, audit-events, 0], offset 682227.
    at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
19:24:49.326 [Samza StreamProcessor Container Thread-0] ERROR 
org.apache.samza.processor.StreamProcessor - Container: 
org.apache.samza.container.SamzaContainer@3e923d9e failed with an exception. 
Stopping the stream processor: c13057a8-42c5-4b68-9f73-29138fc6eb89. Original 
exception:
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Callback 
failed for task Partition 0, ssp SystemStreamPartition [kafka, audit-events, 
0], offset 682227.
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.samza.SamzaException: Callback failed for task Partition 
0, ssp SystemStreamPartition [kafka, audit-events, 0], offset 682227.
    at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
Caused by: ...
    ...
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1$1.<init>(MapOperatorSpec.java:46)
    at 
org.apache.samza.operators.spec.MapOperatorSpec$1.apply(MapOperatorSpec.java:44)
    at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:173)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:189)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:188)
    at java.util.Collections$SingletonList.forEach(Collections.java:4822)
    at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:187)
    at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)
    at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)
    at 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
    ... 6 more
19:24:49.355 [Samza StreamProcessor Container Thread-0] WARN  
org.apache.samza.container.SamzaContainer - Shutdown is no-op since the 
container is already in state: FAILED
19:24:49.355 [Samza StreamProcessor Container Thread-0] WARN  
org.apache.samza.container.SamzaContainer - Shutdown is no-op since the 
container is already in state: FAILED

Prateek Maheshwari <prateek...@gmail.com> writes:

Hi Tom,

Unfortunately this exception only shows that the SamzaContainer tried to
shut down a second time due to a processing timeout. This by itself is
fine, and should be handled by the framework already.

We'll need to look at rest of the framework logs to tell what state the
application was in at this moment, what sequence of events led to this, and
what happened afterwards to cause the application runner status update to
be missed. If you can attach the logs that would be very helpful for
debugging. You can filter for logs from the "org.apache.samza" package.

cc. Shanthoosh, Bharath, in case they're aware of any known issues about
this.

- Prateek


On Mon, Mar 25, 2019 at 7:51 AM Tom Davis <t...@recursivedream.com> wrote:


I am using Samza 1.0, yes. The stacktrace is:

> 19:24:49.326 [Samza StreamProcessor Container Thread-0] ERROR
> org.apache.samza.processor.StreamProcessor - Container:
> org.apache.samza.container.SamzaContainer@3e923d9e failed with an
exception.
> Stopping the stream processor: c13057a8-42c5-4b68-9f73-29138fc6eb89.
Original
> exception:
> org.apache.samza.SamzaException: org.apache.samza.SamzaException:
Callback
> failed for task Partition 0, ssp SystemStreamPartition [kafka,
audit-events,
> 0], offset 682227.
>   at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150)
>   ~[audit-app.jar:?]
>   at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:829)
>   [audit-app.jar:?]
>   at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   [?:1.8.0_181]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_181]
>   at
>
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

>   [?:1.8.0_181]
>   at
>
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

>   [?:1.8.0_181]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
> Caused by: org.apache.samza.SamzaException: Callback failed for task
Partition
> 0, ssp SystemStreamPartition [kafka, audit-events, 0], offset 682227.
>   at
org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89)
>   ~[audit-app.jar:?]
>   at
>
 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)

>   ~[audit-app.jar:?]
>   at
>
 
org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)

>   ~[audit-app.jar:?]
>   at
>
 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:208)

>   ~[audit-app.jar:?]
>   at
>
 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)

>   ~[audit-app.jar:?]
>   at
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:206)
>   ~[audit-app.jar:?]
>   at
>
 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:464)

>   ~[audit-app.jar:?]
>   at
>
 org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:406)

>   ~[audit-app.jar:?]
>   at
>
 
org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:340)

>   ~[audit-app.jar:?]
>   at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:231)
>   ~[audit-app.jar:?]
>   at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
>   ~[audit-app.jar:?]
>   ... 6 more
> 19:24:49.355 [Samza StreamProcessor Container Thread-0] WARN
> org.apache.samza.container.SamzaContainer - Shutdown is no-op since the
> container is already in state: FAILED

I have left out the root exception thrown by my operator, but hopefully
this is sufficient. After this, the app continues to run and the status
remains "RUNNING".

Prateek Maheshwari <prateek...@gmail.com> writes:

> Hi Tom,
>
> This sounds like a bug. ApplicationRunner should return the correct
status
> when the processor has shut down. We fixed a similar standalone bug
> recently, are you already using Samza 1.0.
> If this is reproducible / happens again, a thread dump + logs would also
be
> very helpful for debugging and verifying if the issue is already fixed.
>
> Thanks,
> Prateek
>
> On Fri, Mar 22, 2019 at 7:23 AM Tom Davis <t...@recursivedream.com>
wrote:
>
>>
>> Prateek Maheshwari <prateek...@gmail.com> writes:
>>
>> > Hi Tom,
>> >
>> > This would depend on what your k8s container orchestration logic looks
>> > like. For example, in YARN, 'status' returns 'not running' after
'start'
>> > until all the containers requested from the AM are 'running'. We also
>> > leverage YARN to restart containers/job automatically on failures
(within
>> > some bounds). Additionally, we set up a monitoring alert that goes
off if
>> > the number of running containers stays lower than the number of
expected
>> > containers for extended periods of time (~ 5 minutes).
>> >
>> > Are you saying that you noticed that the LocalApplicationRunner status
>> > returns 'running' even if its stream processor / SamzaContainer has
>> stopped
>> > processing?
>> >
>>
>> Yeah, this is what I mean. We have a health check for the overall
>> ApplicationStatus but if the containers enter a failed state that
>> doesn't result in a shut down of the runner itself. An example from last
>> night: Kafka became unavailable at some point and Samza failed to write
>> checkpoints for a while, ultimately leading to container failures. The
>> last log line is:
>>
>> o.a.s.c.SamzaContainer - Shutdown is no-op since the container is
already
>> in
>> state: FAILED
>>
>> This doesn't cause the Pod to be killed, though, so we just silently
>> stop processing events. How do you determine the number of expected
>> containers? Or are you speaking of containers in terms of YARN and not
>> Samza processors?
>>
>> >
>> > - Prateek
>> >
>> > On Fri, Mar 15, 2019 at 7:26 AM Tom Davis <t...@recursivedream.com>
>> wrote:
>> >
>> >> I'm using the LocalApplicationRunner and had added a liveness check
>> >> around the `status` method. The app is running in Kubernetes so, in
>> >> theory, it could be restarted if exceptions happened during
processing.
>> >> However, it seems that "container failure" is divorced from "app
>> >> failure" because the app continues to run even after all the task
>> >> containers have shut down. Is there a better way to check for
>> >> application health? Is there a way to shut down the application if
all
>> >> containers have failed? Should I simply ensure exceptions never
escape
>> >> operators? Thanks!
>> >>
>>

Reply via email to