[ 
https://issues.apache.org/jira/browse/KAFKA-6738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488234#comment-16488234
 ] 

ASF GitHub Bot commented on KAFKA-6738:
---------------------------------------

wicknicks closed pull request #5010: KAFKA-6738: (WIP) Error Handling in Connect
URL: https://github.com/apache/kafka/pull/5010
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 64258bf7b07..4f31ce264c7 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -80,7 +80,7 @@
               
files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
 
     <suppress checks="ParameterNumber"
-              files="WorkerSourceTask.java"/>
+              files="(WorkerSourceTask|WorkerSinkTask).java"/>
     <suppress checks="ParameterNumber"
               files="WorkerCoordinator.java"/>
     <suppress checks="ParameterNumber"
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 6a90310df5a..f8c02a08547 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -24,6 +24,8 @@
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.StageType;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -91,6 +93,8 @@
     private static final String TRANSFORMS_DOC = "Aliases for the 
transformations to be applied to records.";
     private static final String TRANSFORMS_DISPLAY = "Transforms";
 
+    public static final String ERROR_HANDLING_CONFIG = "errors";
+
     private final EnrichedConnectorConfig enrichedConfig;
     private static class EnrichedConnectorConfig extends AbstractConfig {
         EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> 
props) {
@@ -139,6 +143,13 @@ public ConnectorConfig(Plugins plugins, ConfigDef 
configDef, Map<String, String>
         );
     }
 
+    /**
+     * @return properties to configure error handlers and reporters.
+     */
+    public Map<String, ?> errorHandlerConfig() {
+        return originalsWithPrefix(ERROR_HANDLING_CONFIG + ".");
+    }
+
     @Override
     public Object get(String key) {
         return enrichedConfig.get(key);
@@ -166,6 +177,25 @@ public Object get(String key) {
         return transformations;
     }
 
+    /**
+     * @return an ordered list of stages describing the transformations in 
this connector. The order is specified by
+     * {@link #TRANSFORMS_CONFIG}.
+     */
+    public List<Stage> transformationAsStages() {
+        final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
+        List<Stage> stages = new ArrayList<>();
+        for (String alias : transformAliases) {
+            final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+            stages.add(Stage.newBuilder(StageType.TRANSFORMATION)
+                    .setExecutingClass(getClass(prefix + "type"))
+                    .setConfig(originalsWithPrefix(prefix))
+                    .build()
+            );
+        }
+
+        return stages;
+    }
+
     /**
      * Returns an enriched {@link ConfigDef} building upon the {@code 
ConfigDef}, using the current configuration specified in {@code props} as an 
input.
      * <p>
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index e1d8b1f2e85..3d9baf3e541 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor;
 import org.apache.kafka.connect.transforms.Transformation;
 
 import java.util.Collections;
@@ -32,10 +35,29 @@ public TransformationChain(List<Transformation<R>> 
transformations) {
     }
 
     public R apply(R record) {
+        return apply(record, NoopExecutor.INSTANCE, null);
+    }
+
+    public R apply(R record, OperationExecutor operationExecutor, 
ProcessingContext processingContext) {
         if (transformations.isEmpty()) return record;
 
-        for (Transformation<R> transformation : transformations) {
-            record = transformation.apply(record);
+        for (final Transformation<R> transformation : transformations) {
+            final R current = record;
+
+            // set the current record
+            processingContext.setRecord(current);
+
+            // execute the operation
+            record = operationExecutor.execute(new 
OperationExecutor.Operation<R>() {
+                @Override
+                public R apply() {
+                    return transformation.apply(current);
+                }
+            }, null, processingContext);
+
+            // move to the next stage
+            processingContext.next();
+
             if (record == null) break;
         }
 
@@ -62,7 +84,7 @@ public int hashCode() {
     }
 
     public static <R extends ConnectRecord<R>> TransformationChain<R> noOp() {
-        return new 
TransformationChain<R>(Collections.<Transformation<R>>emptyList());
+        return new 
TransformationChain<>(Collections.<Transformation<R>>emptyList());
     }
 
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1c6465855ff..dd560e28467 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -30,6 +30,14 @@
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.StageType;
+import org.apache.kafka.connect.runtime.errors.impl.DLQReporter;
+import org.apache.kafka.connect.runtime.errors.impl.ReporterFactory;
+import org.apache.kafka.connect.runtime.errors.impl.RetryWithToleranceExecutor;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -364,6 +372,8 @@ public boolean startTask(
         if (tasks.containsKey(id))
             throw new ConnectException("Task already exists in this worker: " 
+ id);
 
+        ProcessingContext.Builder prContextBuilder = 
ProcessingContext.newBuilder(id, config.originals());
+
         final WorkerTask workerTask;
         ClassLoader savedLoader = plugins.currentThreadLoader();
         try {
@@ -395,27 +405,50 @@ public boolean startTask(
                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
                     ClassLoaderUsage.CURRENT_CLASSLOADER
             );
+
+            Stage.Builder keyConverterStage = 
Stage.newBuilder(StageType.KEY_CONVERTER);
+            Stage.Builder valueConverterStage = 
Stage.newBuilder(StageType.VALUE_CONVERTER);
+            Stage.Builder headerConverterStage = 
Stage.newBuilder(StageType.HEADER_CONVERTER);
+
             if (keyConverter == null) {
                 keyConverter = plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+                
keyConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG
 + "."));
                 log.info("Set up the key converter {} for task {} using the 
worker config", keyConverter.getClass(), id);
             } else {
+                
keyConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG
 + "."));
                 log.info("Set up the key converter {} for task {} using the 
connector config", keyConverter.getClass(), id);
             }
+            keyConverterStage.setExecutingClass(keyConverter.getClass());
+
             if (valueConverter == null) {
                 valueConverter = plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+                
valueConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG
 + "."));
                 log.info("Set up the value converter {} for task {} using the 
worker config", valueConverter.getClass(), id);
             } else {
+                
valueConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG
 + "."));
                 log.info("Set up the value converter {} for task {} using the 
connector config", valueConverter.getClass(), id);
             }
+            valueConverterStage.setExecutingClass(valueConverter.getClass());
+
             if (headerConverter == null) {
                 headerConverter = plugins.newHeaderConverter(config, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+                
headerConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG
 + "."));
                 log.info("Set up the header converter {} for task {} using the 
worker config", headerConverter.getClass(), id);
             } else {
+                
headerConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG
 + "."));
                 log.info("Set up the header converter {} for task {} using the 
connector config", headerConverter.getClass(), id);
             }
+            headerConverterStage.setExecutingClass(headerConverter.getClass());
+
+            prContextBuilder.appendStage(keyConverterStage.build());
+            prContextBuilder.appendStage(valueConverterStage.build());
+            prContextBuilder.appendStage(headerConverterStage.build());
 
-            workerTask = buildWorkerTask(connConfig, id, task, statusListener, 
initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
+            prContextBuilder.addReporters(errorReportersConfig(connConfig));
+
+            workerTask = buildWorkerTask(connConfig, id, task, statusListener, 
initialState, keyConverter, valueConverter, headerConverter, connectorLoader, 
prContextBuilder, new RetryWithToleranceExecutor());
             workerTask.initialize(taskConfig);
+            
workerTask.operationExecutor().configure(connConfig.originalsWithPrefix("errors."));
             Plugins.compareAndSwapLoaders(savedLoader);
         } catch (Throwable t) {
             log.error("Failed to start task {}", id, t);
@@ -447,7 +480,9 @@ private WorkerTask buildWorkerTask(ConnectorConfig 
connConfig,
                                        Converter keyConverter,
                                        Converter valueConverter,
                                        HeaderConverter headerConverter,
-                                       ClassLoader loader) {
+                                       ClassLoader loader,
+                                       ProcessingContext.Builder 
prContextBuilder,
+                                       OperationExecutor operationExecutor) {
         // Decide which type of worker task we need based on the type of task.
         if (task instanceof SourceTask) {
             TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(connConfig.<SourceRecord>transformations());
@@ -456,18 +491,55 @@ private WorkerTask buildWorkerTask(ConnectorConfig 
connConfig,
             OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            for (Stage stage: connConfig.transformationAsStages()) {
+                prContextBuilder.prependStage(stage);
+            }
+
+            prContextBuilder.prependStage(Stage.newBuilder(StageType.TASK_POLL)
+                    .setConfig(connConfig.originals())
+                    .setExecutingClass(task.getClass())
+                    .build()
+            );
+
+            
prContextBuilder.appendStage(Stage.newBuilder(StageType.KAFKA_PRODUCE).build());
+
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, 
offsetReader, offsetWriter, config, metrics, loader, time);
+                    headerConverter, transformationChain, producer, 
offsetReader, offsetWriter, config, metrics, loader, time, 
prContextBuilder.build(), operationExecutor);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations());
+
+            for (Stage stage: connConfig.transformationAsStages()) {
+                prContextBuilder.appendStage(stage);
+            }
+
+            prContextBuilder.appendStage(Stage.newBuilder(StageType.TASK_POLL)
+                    .setConfig(connConfig.originals())
+                    .setExecutingClass(task.getClass())
+                    .build()
+            );
+
+            
prContextBuilder.prependStage(Stage.newBuilder(StageType.KAFKA_CONSUME).build());
+
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, metrics, keyConverter,
-                    valueConverter, headerConverter, transformationChain, 
loader, time);
+                    valueConverter, headerConverter, transformationChain, 
loader, time, prContextBuilder.build(), operationExecutor);
+
         } else {
             log.error("Tasks must be a subclass of either SourceTask or 
SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either 
SourceTask or SinkTask");
         }
     }
 
+    /**
+     * Setup properties for dead letter queue using the producer properties 
from worker config before overriding them
+     * from the connector config.
+     * @param connConfig connector config
+     * @return reporters to log error context
+     */
+    private List<ErrorReporter> errorReportersConfig(ConnectorConfig 
connConfig) {
+        return new ReporterFactory().forConfig(producerProps, connConfig);
+    }
+
     private void stopTask(ConnectorTaskId taskId) {
         WorkerTask task = tasks.get(taskId);
         if (task == null) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2ba785c4668..a8945231ed3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -40,6 +40,10 @@
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.StageType;
+import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -100,8 +104,26 @@ public WorkerSinkTask(ConnectorTaskId id,
                           HeaderConverter headerConverter,
                           TransformationChain<SinkRecord> transformationChain,
                           ClassLoader loader,
-                          Time time) {
-        super(id, statusListener, initialState, loader, connectMetrics);
+                          Time time,
+                          ProcessingContext processingContext) {
+        this(id, task, statusListener, initialState, workerConfig, 
connectMetrics, keyConverter, valueConverter, headerConverter, 
transformationChain, loader, time, processingContext, NoopExecutor.INSTANCE);
+    }
+
+    public WorkerSinkTask(ConnectorTaskId id,
+                          SinkTask task,
+                          TaskStatus.Listener statusListener,
+                          TargetState initialState,
+                          WorkerConfig workerConfig,
+                          ConnectMetrics connectMetrics,
+                          Converter keyConverter,
+                          Converter valueConverter,
+                          HeaderConverter headerConverter,
+                          TransformationChain<SinkRecord> transformationChain,
+                          ClassLoader loader,
+                          Time time,
+                          ProcessingContext processingContext,
+                          OperationExecutor operationExecutor) {
+        super(id, statusListener, initialState, loader, connectMetrics, 
processingContext, operationExecutor);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -294,9 +316,16 @@ protected void poll(long timeoutMs) {
         }
 
         log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs);
-        ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
+        final long minTimeout = timeoutMs;
+        processingContext().position(StageType.KAFKA_CONSUME);
+        ConsumerRecords<byte[], byte[]> msgs = operationExecutor().execute(new 
OperationExecutor.Operation<ConsumerRecords<byte[], byte[]>>() {
+            @Override
+            public ConsumerRecords<byte[], byte[]> apply() {
+                return pollConsumer(minTimeout);
+            }
+        }, ConsumerRecords.<byte[], byte[]>empty(), processingContext());
         assert messageBatch.isEmpty() || msgs.isEmpty();
-        log.trace("{} Polling returned {} messages", this, msgs.count());
+        log.info("{} Polling returned {} messages", this, msgs.count());
 
         convertMessages(msgs);
         deliverMessages();
@@ -461,12 +490,39 @@ public String toString() {
 
     private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
         origOffsets.clear();
-        for (ConsumerRecord<byte[], byte[]> msg : msgs) {
+        for (final ConsumerRecord<byte[], byte[]> msg : msgs) {
             log.trace("{} Consuming and converting message in topic '{}' 
partition {} at offset {} and timestamp {}",
                     this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp());
-            SchemaAndValue keyAndSchema = 
keyConverter.toConnectData(msg.topic(), msg.key());
-            SchemaAndValue valueAndSchema = 
valueConverter.toConnectData(msg.topic(), msg.value());
-            Headers headers = convertHeadersFor(msg);
+
+            processingContext().position(StageType.KEY_CONVERTER);
+            SchemaAndValue keyAndSchema = operationExecutor().execute(new 
OperationExecutor.Operation<SchemaAndValue>() {
+                @Override
+                public SchemaAndValue apply() {
+                    return keyConverter.toConnectData(msg.topic(), msg.key());
+                }
+            }, DEFAULT_SCHEMA_AND_VALUE, processingContext());
+
+            processingContext().position(StageType.VALUE_CONVERTER);
+            SchemaAndValue valueAndSchema = operationExecutor().execute(new 
OperationExecutor.Operation<SchemaAndValue>() {
+                @Override
+                public SchemaAndValue apply() {
+                    return valueConverter.toConnectData(msg.topic(), 
msg.value());
+                }
+            }, DEFAULT_SCHEMA_AND_VALUE, processingContext());
+
+            processingContext().position(StageType.HEADER_CONVERTER);
+            Headers headers = operationExecutor().execute(new 
OperationExecutor.Operation<Headers>() {
+                @Override
+                public Headers apply() {
+                    return convertHeadersFor(msg);
+                }
+            }, DEFAULT_HEADERS, processingContext());
+
+            if (keyAndSchema == DEFAULT_SCHEMA_AND_VALUE || valueAndSchema == 
DEFAULT_SCHEMA_AND_VALUE
+                    || headers == DEFAULT_HEADERS) {
+                continue;
+            }
+
             Long timestamp = 
ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
             SinkRecord origRecord = new SinkRecord(msg.topic(), 
msg.partition(),
                     keyAndSchema.schema(), keyAndSchema.value(),
@@ -475,9 +531,12 @@ private void convertMessages(ConsumerRecords<byte[], 
byte[]> msgs) {
                     timestamp,
                     msg.timestampType(),
                     headers);
+
             log.trace("{} Applying transformations to record in topic '{}' 
partition {} at offset {} and timestamp {} with key {} and value {}",
                     this, msg.topic(), msg.partition(), msg.offset(), 
timestamp, keyAndSchema.value(), valueAndSchema.value());
-            SinkRecord transRecord = transformationChain.apply(origRecord);
+            processingContext().position(StageType.TRANSFORMATION);
+            SinkRecord transRecord = transformationChain.apply(origRecord, 
operationExecutor(), processingContext());
+
             origOffsets.put(
                     new TopicPartition(origRecord.topic(), 
origRecord.kafkaPartition()),
                     new OffsetAndMetadata(origRecord.kafkaOffset() + 1)
@@ -521,7 +580,15 @@ private void deliverMessages() {
             // Since we reuse the messageBatch buffer, ensure we give the task 
its own copy
             log.trace("{} Delivering batch of {} messages to task", this, 
messageBatch.size());
             long start = time.milliseconds();
-            task.put(new ArrayList<>(messageBatch));
+            processingContext().position(StageType.TASK_PUT);
+            operationExecutor().execute(new 
OperationExecutor.Operation<Object>() {
+                @Override
+                public Object apply() {
+                    task.put(new ArrayList<>(messageBatch));
+                    return null;
+                }
+            }, processingContext());
+
             recordBatch(messageBatch.size());
             sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
             currentOffsets.putAll(origOffsets);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index f2cef5a63f7..6d956dd0f0e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -34,6 +34,10 @@
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.StageType;
+import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -102,8 +106,30 @@ public WorkerSourceTask(ConnectorTaskId id,
                             WorkerConfig workerConfig,
                             ConnectMetrics connectMetrics,
                             ClassLoader loader,
-                            Time time) {
-        super(id, statusListener, initialState, loader, connectMetrics);
+                            Time time,
+                            ProcessingContext processingContext) {
+        this(id, task, statusListener, initialState, keyConverter, 
valueConverter, headerConverter, transformationChain,
+                producer, offsetReader, offsetWriter, workerConfig, 
connectMetrics, loader, time, processingContext, NoopExecutor.INSTANCE);
+    }
+
+    public WorkerSourceTask(ConnectorTaskId id,
+                SourceTask task,
+                TaskStatus.Listener statusListener,
+                TargetState initialState,
+                Converter keyConverter,
+                Converter valueConverter,
+                HeaderConverter headerConverter,
+                TransformationChain<SourceRecord> transformationChain,
+                KafkaProducer<byte[], byte[]> producer,
+                OffsetStorageReader offsetReader,
+                OffsetStorageWriter offsetWriter,
+                WorkerConfig workerConfig,
+                ConnectMetrics connectMetrics,
+                ClassLoader loader,
+                Time time,
+                ProcessingContext processingContext,
+                OperationExecutor operationExecutor) {
+        super(id, statusListener, initialState, loader, connectMetrics, 
processingContext, operationExecutor);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -184,7 +210,13 @@ public void execute() {
                 if (toSend == null) {
                     log.trace("{} Nothing to send to Kafka. Polling source for 
additional records", this);
                     long start = time.milliseconds();
-                    toSend = task.poll();
+                    processingContext().position(StageType.TASK_POLL);
+                    toSend = operationExecutor().execute(new 
OperationExecutor.Operation<List<SourceRecord>>() {
+                        @Override
+                        public List<SourceRecord> apply() throws 
InterruptedException {
+                            return task.poll();
+                        }
+                    }, processingContext());
                     if (toSend != null) {
                         recordPollReturned(toSend.size(), time.milliseconds() 
- start);
                     }
@@ -216,7 +248,8 @@ private boolean sendRecords() {
         recordBatch(toSend.size());
         final SourceRecordWriteCounter counter = new 
SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
         for (final SourceRecord preTransformRecord : toSend) {
-            final SourceRecord record = 
transformationChain.apply(preTransformRecord);
+            processingContext().position(StageType.TRANSFORMATION);
+            final SourceRecord record = 
transformationChain.apply(preTransformRecord, operationExecutor(), 
processingContext());
 
             if (record == null) {
                 counter.skipRecord();
@@ -224,9 +257,32 @@ private boolean sendRecords() {
                 continue;
             }
 
-            RecordHeaders headers = convertHeaderFor(record);
-            byte[] key = keyConverter.fromConnectData(record.topic(), 
record.keySchema(), record.key());
-            byte[] value = valueConverter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
+            processingContext().setRecord(record);
+
+            processingContext().position(StageType.HEADER_CONVERTER);
+            RecordHeaders headers = operationExecutor().execute(new 
OperationExecutor.Operation<RecordHeaders>() {
+                @Override
+                public RecordHeaders apply() {
+                    return convertHeaderFor(record);
+                }
+            }, DEFAULT_RECORD_HEADERS, processingContext());
+
+            processingContext().position(StageType.KEY_CONVERTER);
+            byte[] key = operationExecutor().execute(new 
OperationExecutor.Operation<byte[]>() {
+                @Override
+                public byte[] apply() {
+                    return keyConverter.fromConnectData(record.topic(), 
record.keySchema(), record.key());
+                }
+            }, processingContext());
+
+            processingContext().position(StageType.VALUE_CONVERTER);
+            byte[] value = operationExecutor().execute(new 
OperationExecutor.Operation<byte[]>() {
+                @Override
+                public byte[] apply() {
+                    return valueConverter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
+                }
+            }, processingContext());
+
             final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(record.topic(), record.kafkaPartition(),
                     ConnectUtils.checkAndConvertTimestamp(record.timestamp()), 
key, value, headers);
             log.trace("{} Appending record with key {}, value {}", this, 
record.key(), record.value());
@@ -247,30 +303,37 @@ private boolean sendRecords() {
             }
             try {
                 final String topic = producerRecord.topic();
-                producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
-                                if (e != null) {
-                                    // Given the default settings for zero 
data loss, this should basically never happen --
-                                    // between "infinite" retries, indefinite 
blocking on full buffers, and "infinite" request
-                                    // timeouts, callbacks with exceptions 
should never be invoked in practice. If the
-                                    // user overrode these settings, the best 
we can do is notify them of the failure via
-                                    // logging.
-                                    log.error("{} failed to send record to {}: 
{}", this, topic, e);
-                                    log.debug("{} Failed record: {}", this, 
preTransformRecord);
-                                } else {
-                                    log.trace("{} Wrote record successfully: 
topic {} partition {} offset {}",
-                                            this,
-                                            recordMetadata.topic(), 
recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord);
-                                }
-                                recordSent(producerRecord);
-                                counter.completeRecord();
-                            }
-                        });
+                processingContext().position(StageType.KAFKA_PRODUCE);
+                processingContext().setRecord(record);
+                operationExecutor().execute(new 
OperationExecutor.Operation<Future<RecordMetadata>>() {
+                    @Override
+                    public Future<RecordMetadata> apply() {
+                        return producer.send(
+                                producerRecord,
+                                new Callback() {
+                                    @Override
+                                    public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
+                                        if (e != null) {
+                                            // Given the default settings for 
zero data loss, this should basically never happen --
+                                            // between "infinite" retries, 
indefinite blocking on full buffers, and "infinite" request
+                                            // timeouts, callbacks with 
exceptions should never be invoked in practice. If the
+                                            // user overrode these settings, 
the best we can do is notify them of the failure via
+                                            // logging.
+                                            log.error("{} failed to send 
record to {}: {}", this, topic, e);
+                                            log.debug("{} Failed record: {}", 
this, preTransformRecord);
+                                        } else {
+                                            log.trace("{} Wrote record 
successfully: topic {} partition {} offset {}",
+                                                    this,
+                                                    recordMetadata.topic(), 
recordMetadata.partition(),
+                                                    recordMetadata.offset());
+                                            
commitTaskRecord(preTransformRecord);
+                                        }
+                                        recordSent(producerRecord);
+                                        counter.completeRecord();
+                                    }
+                                });
+                    }
+                }, processingContext());
                 lastSendFailed = false;
             } catch (RetriableException e) {
                 log.warn("{} Failed to send {}, backing off before retrying:", 
this, producerRecord, e);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index d563f9bdede..ac8ea7faf0b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Sensor;
@@ -25,9 +26,16 @@
 import org.apache.kafka.common.metrics.stats.Frequencies;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.AbstractStatus.State;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -60,11 +68,30 @@
     private volatile boolean stopping;   // indicates whether the Worker has 
asked the task to stop
     private volatile boolean cancelled;  // indicates whether the Worker has 
cancelled the task (e.g. because of slow shutdown)
 
+    private final OperationExecutor operationExecutor;
+    private final ProcessingContext processingContext;
+
+    protected static final SchemaAndValue DEFAULT_SCHEMA_AND_VALUE = new 
SchemaAndValue(Schema.BOOLEAN_SCHEMA, false);
+    protected static final Headers DEFAULT_HEADERS = new ConnectHeaders();
+    protected static final RecordHeaders DEFAULT_RECORD_HEADERS = new 
RecordHeaders();
+
+    // Available for testing
+    public WorkerTask(ConnectorTaskId id,
+                      TaskStatus.Listener statusListener,
+                      TargetState initialState,
+                      ClassLoader loader,
+                      ConnectMetrics connectMetrics,
+                      ProcessingContext processingContext) {
+        this(id, statusListener, initialState, loader, connectMetrics, 
processingContext, NoopExecutor.INSTANCE);
+    }
+
     public WorkerTask(ConnectorTaskId id,
                       TaskStatus.Listener statusListener,
                       TargetState initialState,
                       ClassLoader loader,
-                      ConnectMetrics connectMetrics) {
+                      ConnectMetrics connectMetrics,
+                      ProcessingContext processingContext,
+                      OperationExecutor operationExecutor) {
         this.id = id;
         this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, 
statusListener);
         this.statusListener = taskMetricsGroup;
@@ -73,6 +100,8 @@ public WorkerTask(ConnectorTaskId id,
         this.stopping = false;
         this.cancelled = false;
         this.taskMetricsGroup.recordState(this.targetState);
+        this.processingContext = processingContext;
+        this.operationExecutor = operationExecutor;
     }
 
     public ConnectorTaskId id() {
@@ -100,6 +129,14 @@ private void triggerStop() {
         }
     }
 
+    public OperationExecutor operationExecutor() {
+        return operationExecutor;
+    }
+
+    public ProcessingContext processingContext() {
+        return processingContext;
+    }
+
     /**
      * Stop this task from processing messages. This method does not block, it 
only triggers
      * shutdown. Use #{@link #awaitStop} to block until completion.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
new file mode 100644
index 00000000000..a3625db6043
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.Configurable;
+
+public abstract class ErrorReporter implements Configurable {
+
+    public void initialize() {
+    }
+
+    public abstract void report(ProcessingContext report);
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/OperationExecutor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/OperationExecutor.java
new file mode 100644
index 00000000000..9cdd9b39347
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/OperationExecutor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+
+/**
+ * Execute a connect worker operation with configurable retries and 
tolerances. An operation here means applying tranformations
+ * on a ConnectRecord, converting it to a format used by the Connect 
framework, writing it to a sink or reading from a source.
+ *
+ * <p/>The following failures will be retried:
+ *
+ * <ol>
+ *     <li> Connector tasks fail with RetriableException when reading/writing 
records from/to external system.
+ *     <li> Transformations fail and throw exceptions while processing records.
+ *     <li> Converters fail to correctly serialize/deserialize records.
+ *     <li> Exceptions while producing/consuming to/from Kafka topics in the 
Connect framework.
+ * </ol>
+ *
+ * The executor will report errors on failure (after reattempting). It will 
throw a {@link ToleranceExceededException} if
+ * any of the tolerance limits are crossed.
+ */
+public abstract class OperationExecutor implements Configurable {
+
+    public <V> V execute(Operation<V> operation, ProcessingContext context) {
+        return execute(operation, null, context);
+    }
+
+    /**
+     * Configure this class with the given key-value pairs
+     */
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+
+    public abstract <V> V execute(Operation<V> operation, V value, 
ProcessingContext context);
+
+    public interface Operation<V> {
+        V apply() throws Exception;
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
new file mode 100644
index 00000000000..060d98daf54
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.utils.Base64;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.errors.impl.StructUtil;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * This object will contain all the runtime context for an error which occurs 
in the Connect framework while
+ * processing a record.
+ */
+public class ProcessingContext implements Structable {
+
+    private final String taskId;
+    private final Map<String, Object> workerConfig;
+    private final List<Stage> stages;
+    private final ErrorReporter[] reporters;
+
+    private Exception exception;
+    private int current = 0;
+    private int attempt;
+    private ConnectRecord record;
+    private long timetstamp;
+
+    private JsonConverter valueConverter = new JsonConverter();
+
+    public static Builder newBuilder(ConnectorTaskId taskId, Map<String, 
Object> workerConfig) {
+        Objects.requireNonNull(taskId);
+        Objects.requireNonNull(workerConfig);
+
+        return new Builder(taskId.toString(), workerConfig);
+    }
+
+    // VisibleForTesting
+    public static ProcessingContext noop(String taskId) {
+        return new ProcessingContext(taskId, Collections.<String, 
Object>emptyMap(), Collections.<Stage>emptyList(), new ErrorReporter[]{});
+    }
+
+    private ProcessingContext(String taskId, Map<String, Object> workerConfig, 
List<Stage> stages, ErrorReporter[] reporters) {
+        Objects.requireNonNull(taskId);
+        Objects.requireNonNull(workerConfig);
+        Objects.requireNonNull(stages);
+        Objects.requireNonNull(reporters);
+
+        this.taskId = taskId;
+        this.workerConfig = workerConfig;
+        this.stages = stages;
+        this.reporters = reporters;
+
+        Map<String, Object> config = new HashMap<>();
+        config.put("schemas.enable", false);
+        config.put("converter.type", "value");
+        valueConverter.configure(config);
+    }
+
+    /**
+     * @return the configuration of the Connect worker
+     */
+    public Map<String, Object> workerConfig() {
+        return workerConfig;
+    }
+
+    /**
+     * @return which task reported this error
+     */
+    @Field("task_id")
+    public String taskId() {
+        return taskId;
+    }
+
+    /**
+     * @return an ordered list of stages. Connect will start with executing 
stage 0 and then move up the list.
+     */
+    public List<Stage> stages() {
+        return stages;
+    }
+
+    public Stage current() {
+        return stages.get(current);
+    }
+
+    /**
+     * @return at what stage did this operation fail (0 indicates first stage)
+     */
+    @Field("index")
+    public int index() {
+        return current;
+    }
+
+    /**
+     * @return which attempt was this (first error will be 0)
+     */
+    @Field("attempt")
+    public int attempt() {
+        return attempt;
+    }
+
+    /**
+     * @return the (epoch) time of failure
+     */
+    @Field("time_of_error")
+    public long timeOfError() {
+        return timetstamp;
+    }
+
+    public void setTimeOfError(long timetstamp) {
+        this.timetstamp = timetstamp;
+    }
+
+    /**
+     * The exception accompanying this failure (if any)
+     */
+    @Field("exception")
+    public Exception exception() {
+        return exception;
+    }
+
+    /**
+     * @return the record which when input the current stage caused the 
failure.
+     */
+    public ConnectRecord record() {
+        return record;
+    }
+
+    public void setRecord(ConnectRecord record) {
+        this.record = record;
+    }
+
+    public void setException(Exception ex) {
+        this.exception = ex;
+    }
+
+    public void report() {
+        for (ErrorReporter reporter : reporters) {
+            reporter.report(this);
+        }
+    }
+
+    @Override
+    public Struct toStruct() {
+        Struct metadata = StructUtil.toStruct(this);
+
+        Schema valueSchema = new SchemaBuilder(Schema.Type.STRUCT)
+                .field("schema", Schema.STRING_SCHEMA)
+                .field("object", Schema.STRING_SCHEMA).build();
+        Struct value = new Struct(valueSchema);
+        value.put("schema", record.valueSchema().type().toString());
+        value.put("object", Base64.encoder().encodeToString((byte[]) 
record.value()));
+
+        SchemaBuilder recordSchema = new SchemaBuilder(Schema.Type.STRUCT);
+        recordSchema.field("topic", Schema.STRING_SCHEMA);
+        recordSchema.field("timestamp", Schema.INT64_SCHEMA);
+        recordSchema.field("offset", Schema.INT64_SCHEMA);
+        recordSchema.field("partition", Schema.INT32_SCHEMA);
+        recordSchema.field("value", value.schema());
+
+        Struct recordStruct = new Struct(recordSchema);
+        recordStruct.put("topic", record().topic());
+        recordStruct.put("timestamp", record().timestamp());
+        recordStruct.put("partition", record().kafkaPartition());
+        recordStruct.put("offset", ((SinkRecord) record()).kafkaOffset());
+        recordStruct.put("value", value);
+
+        Struct stage = StructUtil.toStruct(current());
+
+        SchemaBuilder finalSchemaBuilder = new 
SchemaBuilder(Schema.Type.STRUCT)
+                .field("record", recordSchema)
+                .field("stage", stage.schema());
+        for (org.apache.kafka.connect.data.Field f : 
metadata.schema().fields()) {
+            finalSchemaBuilder.field(f.name(), f.schema());
+        }
+
+        finalSchemaBuilder.field("stages", 
SchemaBuilder.array(stage.schema()).build());
+        Struct struct = new Struct(finalSchemaBuilder.build());
+
+        for (org.apache.kafka.connect.data.Field f : 
metadata.schema().fields()) {
+            struct.put(f.name(), metadata.get(f));
+        }
+        struct.put("record", recordStruct);
+        struct.put("stage", stage);
+
+        List<Struct> ll = new ArrayList<>();
+        for (Stage st : stages()) {
+            ll.add(StructUtil.toStruct(st));
+        }
+        struct.put("stages", ll);
+
+        return struct;
+    }
+
+    public void reset() {
+        current = 0;
+        attempt = 0;
+    }
+
+    /**
+     * Position index to the first stage of the given type
+     *
+     * @param type the given type
+     */
+    public void position(StageType type) {
+        reset();
+        for (int i = 0; i < stages.size(); i++) {
+            if (type == stages.get(i).type()) {
+                current = i;
+                break;
+            }
+        }
+    }
+
+    public void next() {
+        current++;
+    }
+
+    public void incrementAttempt() {
+        attempt++;
+    }
+
+    public static class Builder {
+        private final Map<String, Object> workerConfig;
+        private final String taskId;
+
+        private List<ErrorReporter> reporters = new ArrayList<>();
+        private LinkedList<Stage> stages = new LinkedList<>();
+
+        private Builder(String taskId, Map<String, Object> workerConfig) {
+            this.taskId = taskId;
+            this.workerConfig = workerConfig;
+        }
+
+        public Builder prependStage(Stage stage) {
+            stages.addFirst(stage);
+            return this;
+        }
+
+        public Builder appendStage(Stage stage) {
+            stages.addLast(stage);
+            return this;
+        }
+
+        public Builder addReporters(Collection<ErrorReporter> reporters) {
+            this.reporters.addAll(reporters);
+            return this;
+        }
+
+        public ProcessingContext build() {
+            return new ProcessingContext(taskId, workerConfig, stages, 
reporters.toArray(new ErrorReporter[0]));
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java
new file mode 100644
index 00000000000..596085e8b41
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+public class Stage implements Structable {
+
+    private final StageType type;
+    private final Map<String, Object> config;
+    private final Class<?> klass;
+
+    public static Builder newBuilder(StageType type) {
+        return new Builder(type);
+    }
+
+    private Stage(StageType type, Map<String, Object> config, Class<?> klass) {
+        this.type = type;
+        this.config = config;
+        this.klass = klass;
+    }
+
+    /**
+     * @return at what stage in processing did the error happen
+     */
+    @Field("type")
+    public StageType type() {
+        return type;
+    }
+
+    /**
+     * @return name of the class executing this stage.
+     */
+    @Field("class")
+    public Class<?> executingClass() {
+        return klass;
+    }
+
+    /**
+     * @return properties used to configure this stage
+     */
+    @Field("config")
+    public Map<String, Object> config() {
+        return config;
+    }
+
+    @Override
+    public Struct toStruct() {
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return "Stage{" + toStruct() + "}";
+    }
+
+    public static class Builder {
+        private StageType type;
+        private Map<String, Object> config;
+        private Class<?> klass;
+
+        private Builder(StageType type) {
+            this.type = type;
+        }
+
+        public Builder setExecutingClass(Class<?> klass) {
+            this.klass = klass;
+            return this;
+        }
+
+        public Builder setConfig(Map<String, Object> config) {
+            this.config = config;
+            return this;
+        }
+
+        public Stage build() {
+            Objects.requireNonNull(type);
+            return new Stage(type, config == null ? Collections.<String, 
Object>emptyMap() : config, klass);
+        }
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/StageType.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/StageType.java
new file mode 100644
index 00000000000..22f25baa41d
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/StageType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+/**
+ * A logical stage in a Connect pipeline
+ */
+public enum StageType {
+
+    /**
+     * When the task starts up
+     */
+    TASK_START,
+
+    /**
+     * when running any transform operation on a record
+     */
+    TRANSFORMATION,
+
+    /**
+     * when calling the poll() method on a SourceConnector
+     */
+    TASK_POLL,
+
+    /**
+     * when calling the put() method on a SinkConnector
+     */
+    TASK_PUT,
+
+    /**
+     * when using the key converter to serialize/deserialize keys in 
ConnectRecords
+     */
+    KEY_CONVERTER,
+
+    /**
+     * when using the value converter to serialize/deserialize values in 
ConnectRecords
+     */
+    VALUE_CONVERTER,
+
+    /**
+     * when using the header converter to serialize/deserialize headers in 
ConnectRecords
+     */
+    HEADER_CONVERTER,
+
+    /**
+     * when the worker is committing offsets for the task
+     */
+    COMMIT_OFFSETS,
+
+    /**
+     * When the task is shutting down
+     */
+    TASK_CLOSE,
+
+    /**
+     * Producing to Kafka topic
+     */
+    KAFKA_PRODUCE,
+
+    /**
+     * Consuming from a Kafka topic
+     */
+    KAFKA_CONSUME
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Structable.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Structable.java
new file mode 100644
index 00000000000..03a88a8fb8c
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Structable.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.connect.data.Struct;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+
+public interface Structable {
+
+    /**
+     * @return a {@link Struct} representation of this object
+     */
+    Struct toStruct();
+
+    @Target({METHOD})
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface Field {
+        String value();
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceExceededException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceExceededException.java
new file mode 100644
index 00000000000..b50bd606031
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceExceededException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ToleranceExceededException extends ConnectException {
+
+    public ToleranceExceededException(String msg, Throwable throwable) {
+        super(msg, throwable);
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/DLQReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/DLQReporter.java
new file mode 100644
index 00000000000..2f99715978a
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/DLQReporter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors.impl;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class DLQReporter extends ErrorReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(DLQReporter.class);
+
+    public static final String DLQ_TOPIC_NAME = "errors.dlq.topic.name";
+    public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic 
where these messages are written to.";
+
+    public static final String DLQ_TOPIC_PARTITIONS = 
"errors.dlq.topic.partitions";
+    public static final String DLQ_TOPIC_PARTITIONS_DOC = "Number of 
partitions for the DLQ topic.";
+    public static final int DLQ_TOPIC_PARTITIONS_DEFAULT = 25;
+
+    public static final String DLQ_TOPIC_REPLICATION_FACTOR = 
"errors.dlq.topic.replication.factor";
+    public static final String DLQ_TOPIC_REPLICATION_FACTOR_DOC = "The 
replication factor for the DLQ topic.";
+    public static final short DLQ_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+
+    public static final String DLQ_INCLUDE_CONFIGS = "dlq.include.configs";
+    public static final String DLQ_INCLUDE_CONFIGS_DOC = "Include the (worker, 
connector) configs in the log.";
+    public static final boolean DLQ_INCLUDE_CONFIGS_DEFAULT = false;
+
+    public static final String DLQ_INCLUDE_MESSAGES = "dlq.include.messages";
+    public static final String DLQ_INCLUDE_MESSAGES_DOC = "Include the Connect 
Record which failed to process in the log.";
+    public static final boolean DLQ_INCLUDE_MESSAGES_DEFAULT = false;
+
+    public static final String DLQ_CONVERTER = "dlq.converter";
+    public static final String DLQ_CONVERTER_DOC = "Include the Connect Record 
which failed to process in the log.";
+    public static final Class<? extends Converter> DLQ_CONVERTER_DEFAULT = 
StringConverter.class;
+
+    public static final String DLQ_PRODUCER_PROPERTIES = "dlq.producer";
+
+    private DlqReporterConfig config;
+    private KafkaProducer<byte[], byte[]> producer;
+    private Converter converter;
+
+    static ConfigDef getConfigDef() {
+        return new ConfigDef()
+                .define(DLQ_TOPIC_NAME, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, DLQ_TOPIC_NAME_DOC)
+                .define(DLQ_TOPIC_PARTITIONS, ConfigDef.Type.INT, 
DLQ_TOPIC_PARTITIONS_DEFAULT, atLeast(1), ConfigDef.Importance.HIGH, 
DLQ_TOPIC_PARTITIONS_DOC)
+                .define(DLQ_TOPIC_REPLICATION_FACTOR, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), ConfigDef.Importance.HIGH, 
DLQ_TOPIC_REPLICATION_FACTOR_DOC)
+                .define(DLQ_INCLUDE_CONFIGS, ConfigDef.Type.BOOLEAN, 
DLQ_INCLUDE_CONFIGS_DEFAULT, ConfigDef.Importance.HIGH, DLQ_INCLUDE_CONFIGS_DOC)
+                .define(DLQ_INCLUDE_MESSAGES, ConfigDef.Type.BOOLEAN, 
DLQ_INCLUDE_MESSAGES_DEFAULT, ConfigDef.Importance.HIGH, 
DLQ_INCLUDE_MESSAGES_DOC)
+                .define(DLQ_CONVERTER, ConfigDef.Type.CLASS, 
DLQ_CONVERTER_DEFAULT, ConfigDef.Importance.HIGH, DLQ_CONVERTER_DOC);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        config = new DlqReporterConfig(configs);
+    }
+
+    @Override
+    public void initialize() {
+        // check if topic exists
+        NewTopic topicDescription = TopicAdmin.defineTopic(config.topic())
+                .partitions(config.topicPartitions())
+                .replicationFactor(config.topicReplicationFactor())
+                .build();
+
+        Map<String, Object> adminProps = config.getProducerProps();
+        try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+            admin.createTopics(topicDescription);
+        }
+
+        Map<String, Object> producerProps = config.getProducerProps();
+        producer = new KafkaProducer<>(producerProps);
+
+        try {
+            if (config.converter().isAssignableFrom(Converter.class)) {
+                converter = config.converter().newInstance();
+            }
+        } catch (InstantiationException e) {
+            throw new ConnectException("Could not instantiate converter");
+        } catch (IllegalAccessException e) {
+            throw new ConnectException("Could not access class");
+        }
+    }
+
+    @Override
+    public void report(final ProcessingContext report) {
+        byte[] val;
+        try {
+            val = converter.fromConnectData(config.topic(), 
SchemaBuilder.struct().schema(), report);
+        } catch (Exception e) {
+            log.error("Could not convert report for producing", e);
+            return;
+        }
+
+
+
+        producer.send(new ProducerRecord<byte[], byte[]>(config.topic(), val), 
new Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception 
exception) {
+                if (exception != null) {
+                    log.error("Could not write record to DLQ", report);
+                }
+            }
+        });
+    }
+
+    private static class DlqReporterConfig extends AbstractConfig {
+        public DlqReporterConfig(Map<?, ?> originals) {
+            super(getConfigDef(), originals);
+        }
+
+        public String topic() {
+            return getString(DLQ_TOPIC_NAME);
+        }
+
+        public int topicPartitions() {
+            return getInt(DLQ_TOPIC_NAME);
+        }
+
+        public short topicReplicationFactor() {
+            return getShort(DLQ_TOPIC_REPLICATION_FACTOR);
+        }
+
+        public boolean includeConfigs() {
+            return getBoolean(DLQ_INCLUDE_CONFIGS);
+        }
+
+        public boolean includeMessages() {
+            return getBoolean(DLQ_INCLUDE_MESSAGES);
+        }
+
+        @SuppressWarnings("unchecked")
+        public Class<? extends Converter> converter() {
+            return (Class<? extends Converter>) getClass(DLQ_CONVERTER);
+        }
+
+        public Map<String, Object> getProducerProps() {
+            return originalsWithPrefix(DLQ_PRODUCER_PROPERTIES + ".");
+        }
+
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ErrorMetricsReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ErrorMetricsReporter.java
new file mode 100644
index 00000000000..d1d01277b5b
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ErrorMetricsReporter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors.impl;
+
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+
+import java.util.Map;
+
+public class ErrorMetricsReporter extends ErrorReporter {
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+
+    @Override
+    public void report(ProcessingContext report) {
+
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/LogReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/LogReporter.java
new file mode 100644
index 00000000000..c8aa09f5bbc
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/LogReporter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors.impl;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class LogReporter extends ErrorReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LogReporter.class);
+
+    public static final String LOG_INCLUDE_CONFIGS = "log.include.configs";
+    public static final String LOG_INCLUDE_CONFIGS_DOC = "Include the (worker, 
connector) configs in the log.";
+    public static final boolean LOG_INCLUDE_CONFIGS_DEFAULT = false;
+
+    public static final String LOG_INCLUDE_MESSAGES = "log.include.messages";
+    public static final String LOG_INCLUDE_MESSAGES_DOC = "Include the Connect 
Record which failed to process in the log.";
+    public static final boolean LOG_INCLUDE_MESSAGES_DEFAULT = false;
+
+    private LogReporterConfig config;
+
+    static ConfigDef getConfigDef() {
+        return new ConfigDef()
+                .define(LOG_INCLUDE_CONFIGS, ConfigDef.Type.BOOLEAN, 
LOG_INCLUDE_CONFIGS_DEFAULT, ConfigDef.Importance.HIGH, LOG_INCLUDE_CONFIGS_DOC)
+                .define(LOG_INCLUDE_MESSAGES, ConfigDef.Type.BOOLEAN, 
LOG_INCLUDE_MESSAGES_DEFAULT, ConfigDef.Importance.HIGH, 
LOG_INCLUDE_MESSAGES_DOC);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        config = new LogReporterConfig(configs);
+    }
+
+    @Override
+    public void report(ProcessingContext context) {
+        log.info("Error processing record. Context={}", 
createLogString(context));
+    }
+
+    public String createLogString(ProcessingContext context) {
+        return String.valueOf(context.toStruct());
+    }
+
+    static class LogReporterConfig extends AbstractConfig {
+        public LogReporterConfig(Map<?, ?> originals) {
+            super(getConfigDef(), originals);
+        }
+
+        public boolean includeConfigs() {
+            return getBoolean(LOG_INCLUDE_CONFIGS);
+        }
+
+        public boolean includeMessages() {
+            return getBoolean(LOG_INCLUDE_MESSAGES);
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/NoopExecutor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/NoopExecutor.java
new file mode 100644
index 00000000000..4b7bf317669
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/NoopExecutor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors.impl;
+
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+
+public class NoopExecutor extends OperationExecutor {
+
+    public static final OperationExecutor INSTANCE = new NoopExecutor();
+
+    private NoopExecutor() {
+    }
+
+    @Override
+    public <V> V execute(OperationExecutor.Operation<V> operation, V value, 
ProcessingContext context) {
+        try {
+            return operation.apply();
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            context.setException(e);
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ReporterFactory.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ReporterFactory.java
new file mode 100644
index 00000000000..23de34fcd0d
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ReporterFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors.impl;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ReporterFactory {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ReporterFactory.class);
+
+    public static final String DLQ_ENABLE = "dlq.enable";
+    public static final String DLQ_ENABLE_DOC = "Log the error context along 
with the other application logs.";
+    public static final boolean DLQ_ENABLE_DEFAULT = false;
+
+    public static final String LOG_ENABLE = "log.enable";
+    public static final String LOG_ENABLE_DOC = "Log the error context along 
with the other application logs.";
+    public static final boolean LOG_ENABLE_DEFAULT = false;
+
+    static ConfigDef getConfigDef() {
+        return new ConfigDef()
+                .define(DLQ_ENABLE, ConfigDef.Type.BOOLEAN, 
DLQ_ENABLE_DEFAULT, ConfigDef.Importance.HIGH, DLQ_ENABLE_DOC)
+                .define(LOG_ENABLE, ConfigDef.Type.BOOLEAN, 
LOG_ENABLE_DEFAULT, ConfigDef.Importance.HIGH, LOG_ENABLE_DOC);
+    }
+
+    public List<ErrorReporter> forConfig(Map<String, Object> 
workerProducerProps, ConnectorConfig connConfig) {
+        Map<String, Object> configs = new HashMap<>();
+        for (Map.Entry<String, Object> e: workerProducerProps.entrySet()) {
+            configs.put(DLQReporter.DLQ_PRODUCER_PROPERTIES + "." + 
e.getKey(), e.getValue());
+        }
+        configs.putAll(connConfig.errorHandlerConfig());
+
+        ReporterFactoryConfig config = new 
ReporterFactoryConfig(getConfigDef(), configs);
+        List<ErrorReporter> reporters = new ArrayList<>(3);
+        log.info("Adding metrics reporter for reporting errors");
+        reporters.add(new ErrorMetricsReporter());
+        if (config.isDlqReporterEnabled()) {
+            log.info("Adding DLQ reporter for reporting errors");
+            DLQReporter reporter = new DLQReporter();
+            reporter.configure(configs);
+            reporter.initialize();
+            reporters.add(reporter);
+        }
+        if (config.isLogReporterEnabled()) {
+            log.info("Adding Log reporter for reporting errors");
+            LogReporter reporter = new LogReporter();
+            reporter.configure(configs);
+            reporter.initialize();
+            reporters.add(reporter);
+        }
+        return reporters;
+    }
+
+    static class ReporterFactoryConfig extends AbstractConfig {
+
+        public ReporterFactoryConfig(ConfigDef definition, Map<?, ?> 
originals) {
+            super(definition, originals, false);
+        }
+
+        public boolean isLogReporterEnabled() {
+            return getBoolean(LOG_ENABLE);
+        }
+
+        public boolean isDlqReporterEnabled() {
+            return getBoolean(DLQ_ENABLE);
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutor.java
new file mode 100644
index 00000000000..d4a4b36b95a
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutor.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors.impl;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.ToleranceExceededException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+public class RetryWithToleranceExecutor extends OperationExecutor {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RetryWithToleranceExecutor.class);
+
+    public static final String RETRIES_LIMIT = "retries.limit";
+    public static final String RETRIES_LIMIT_DOC = "The maximum number of 
retries before failing an operation";
+    public static final long RETRIES_LIMIT_DEFAULT = 0;
+
+    public static final String RETRIES_DELAY_MAX_MS = "retries.delay.max.ms";
+    public static final String RETRIES_DELAY_MAX_MS_DOC = "The maximum 
duration between two consecutive retries (in milliseconds).";
+    public static final long RETRIES_DELAY_MAX_MS_DEFAULT = 60000;
+
+    public static final long RETRIES_DELAY_MIN_MS = 1000;
+
+    public static final String TOLERANCE_LIMIT = "tolerance.limit";
+    public static final String TOLERANCE_LIMIT_DOC = "Fail the task if we 
exceed specified number of errors overall.";
+    public static final long TOLERANCE_LIMIT_DEFAULT = 0;
+
+    public static final String TOLERANCE_RATE_LIMIT = "tolerance.rate.limit";
+    public static final String TOLERANCE_RATE_LIMIT_DOC = "Fail the task if we 
exceed specified number of errors in the observed duration.";
+    public static final long TOLERANCE_RATE_LIMIT_DEFAULT = 0;
+
+    public static final String TOLERANCE_RATE_DURATION = 
"tolerance.rate.duration";
+    public static final String TOLERANCE_RATE_DURATION_DOC = "The duration of 
the window for which we will monitor errors.";
+    public static final String TOLERANCE_RATE_DURATION_DEFAULT = "minute";
+
+    private final Time time;
+    private RetryWithToleranceExecutorConfig config;
+
+    private long totalFailures = 0;
+    private long totalFailuresInDuration = 0;
+    private long durationWindow = 0;
+    private long durationStart = 0;
+
+    public RetryWithToleranceExecutor() {
+        this(new SystemTime());
+    }
+
+    public RetryWithToleranceExecutor(Time time) {
+        this.time = time;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        config = new RetryWithToleranceExecutorConfig(configs);
+        durationWindow = TimeUnit.MILLISECONDS.convert(1, 
config.toleranceRateDuration());
+        durationStart = time.milliseconds() % durationWindow;
+    }
+
+    static ConfigDef getConfigDef() {
+        return new ConfigDef()
+                .define(RETRIES_LIMIT, ConfigDef.Type.LONG, 
RETRIES_LIMIT_DEFAULT, ConfigDef.Importance.HIGH, RETRIES_LIMIT_DOC)
+                .define(RETRIES_DELAY_MAX_MS, ConfigDef.Type.LONG, 
RETRIES_DELAY_MAX_MS_DEFAULT, atLeast(1), ConfigDef.Importance.MEDIUM, 
RETRIES_DELAY_MAX_MS_DOC)
+                .define(TOLERANCE_LIMIT, ConfigDef.Type.LONG, 
TOLERANCE_LIMIT_DEFAULT, ConfigDef.Importance.HIGH, TOLERANCE_LIMIT_DOC)
+                .define(TOLERANCE_RATE_LIMIT, ConfigDef.Type.LONG, 
TOLERANCE_RATE_LIMIT_DEFAULT, ConfigDef.Importance.MEDIUM, 
TOLERANCE_RATE_LIMIT_DOC)
+                .define(TOLERANCE_RATE_DURATION, ConfigDef.Type.STRING, 
TOLERANCE_RATE_DURATION_DEFAULT, in("minute", "hour", "day"), 
ConfigDef.Importance.MEDIUM, TOLERANCE_RATE_DURATION_DOC);
+    }
+
+    @Override
+    public <V> V execute(Operation<V> operation, V value, ProcessingContext 
context) {
+        try {
+            switch (context.current().type()) {
+                case TRANSFORMATION:
+                case HEADER_CONVERTER:
+                case KEY_CONVERTER:
+                case VALUE_CONVERTER:
+                    return execAndHandleError(operation, value, context, 
Exception.class);
+                case KAFKA_PRODUCE:
+                case KAFKA_CONSUME:
+                    return execAndHandleError(operation, value, context, 
org.apache.kafka.common.errors.RetriableException.class);
+                default:
+                    return execAndHandleError(operation, value, context, 
org.apache.kafka.connect.errors.RetriableException.class);
+            }
+        } finally {
+            if (context.exception() != null) {
+                context.report();
+            }
+        }
+    }
+
+    private <V> V execAndHandleError(Operation<V> operation, V value, 
ProcessingContext context, Class<? extends Exception> handled) {
+        Response<V> response = new Response<>();
+        boolean retry;
+        do {
+            apply(response, operation, handled);
+            context.incrementAttempt();
+            switch (response.status) {
+                case SUCCESS:
+                    return response.result;
+                case RETRY:
+                    context.setException(response.ex);
+                    retry = checkRetry(context);
+                    log.trace("Operation failed. For attempt={}, limit={}, 
retry={}", context.attempt(), config.retriesLimit(), retry);
+                    break;
+                case UNHANDLED_EXCEPTION:
+                    context.setException(response.ex);
+                    throw new ConnectException(response.ex);
+                default:
+                    throw new ConnectException("Undefined state: " + 
response.status);
+            }
+            if (retry) {
+                backoff(context);
+                if (Thread.currentThread().isInterrupted()) {
+                    // thread was interrupted during sleep. kill the task.
+                    throw new ConnectException("Thread was interrupted");
+                }
+            }
+        } while (retry);
+
+        // mark this record as failed.
+        totalFailures++;
+        context.setTimeOfError(time.milliseconds());
+
+        if (!withinToleranceLimits(context)) {
+            throw new ToleranceExceededException("Tolerance Limit Exceeded", 
response.ex);
+        }
+
+        log.trace("Operation failed but within tolerance limits. Returning 
default value={}", value);
+        return value;
+    }
+
+    // Visible for testing
+    protected boolean withinToleranceLimits(ProcessingContext context) {
+        final long timeOfError = context.timeOfError();
+        long newDurationStart = timeOfError - timeOfError % durationWindow;
+        if (newDurationStart > durationStart) {
+            durationStart = newDurationStart;
+            totalFailuresInDuration = 0;
+        }
+        totalFailuresInDuration++;
+
+        log.info("Marking the record as failed, totalFailures={}, 
totalFailuresInDuration={}", totalFailures, totalFailuresInDuration);
+
+        if (totalFailures > config.toleranceLimit() || totalFailuresInDuration 
> config.toleranceRateLimit()) {
+            return false;
+        }
+
+        return true;
+    }
+
+    // Visible for tests
+    protected boolean checkRetry(ProcessingContext context) {
+        long limit = config.retriesLimit();
+        if (limit == -1) {
+            return true;
+        } else if (limit == 0) {
+            return false;
+        } else if (limit > 0) {
+            // number of retries is one less than the number of attempts.
+            return (context.attempt() - 1) < config.retriesLimit();
+        } else {
+            log.error("Unexpected value for retry limit={}. Will disable 
retry.", limit);
+            return false;
+        }
+    }
+
+    protected void backoff(ProcessingContext context) {
+        int numRetry = context.attempt() - 1;
+        long delay = RETRIES_DELAY_MIN_MS << numRetry;
+        if (delay > config.retriesDelayMax()) {
+            delay = 
ThreadLocalRandom.current().nextLong(config.retriesDelayMax());
+        }
+
+        log.debug("Sleeping for {} millis", delay);
+        time.sleep(delay);
+    }
+
+    private <V> void apply(Response<V> response, Operation<V> operation, 
Class<? extends Exception> handled) {
+        try {
+            response.result = operation.apply();
+            response.ex = null;
+            response.status = ResponseStatus.SUCCESS;
+        } catch (Exception e) {
+            response.ex = e;
+            response.result = null;
+            if (handled.isAssignableFrom(e.getClass())) {
+                response.status = ResponseStatus.RETRY;
+            } else {
+                response.status = ResponseStatus.UNHANDLED_EXCEPTION;
+            }
+        }
+    }
+
+    static class Response<V> {
+        ResponseStatus status;
+        Exception ex;
+        V result;
+    }
+
+    enum ResponseStatus {
+        SUCCESS,
+        RETRY,
+        UNHANDLED_EXCEPTION
+    }
+
+    static class RetryWithToleranceExecutorConfig extends AbstractConfig {
+
+        public RetryWithToleranceExecutorConfig(Map<?, ?> originals) {
+            super(getConfigDef(), originals);
+        }
+
+        public long retriesLimit() {
+            return getLong(RETRIES_LIMIT);
+        }
+
+        public long retriesDelayMax() {
+            return getLong(RETRIES_DELAY_MAX_MS);
+        }
+
+        public long toleranceLimit() {
+            return getLong(TOLERANCE_LIMIT);
+        }
+
+        public long toleranceRateLimit() {
+            return getLong(TOLERANCE_RATE_LIMIT);
+        }
+
+        public TimeUnit toleranceRateDuration() {
+            final String duration = getString(TOLERANCE_RATE_DURATION);
+            switch (duration.toLowerCase(Locale.ROOT)) {
+                case "minute":
+                    return TimeUnit.MINUTES;
+                case "hour":
+                    return TimeUnit.HOURS;
+                case "day":
+                    return TimeUnit.DAYS;
+                default:
+                    throw new ConfigException("Could not recognize value " + 
getString(TOLERANCE_LIMIT) + " for config " + TOLERANCE_RATE_DURATION);
+            }
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/StructUtil.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/StructUtil.java
new file mode 100644
index 00000000000..a4081d2e2c4
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/StructUtil.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors.impl;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.StageType;
+import org.apache.kafka.connect.runtime.errors.Structable;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class StructUtil {
+
+    private static final Logger log = 
LoggerFactory.getLogger(StructUtil.class);
+
+    public static Struct toStruct(Structable structable) {
+        Objects.requireNonNull(structable);
+
+        Schema objectSchema = getSchemaFor(structable.getClass());
+        return getStructFor(structable, objectSchema);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Struct getStructFor(Structable structable, Schema schema) {
+        Objects.requireNonNull(structable);
+
+        Struct struct = new Struct(schema);
+        Method[] methods = structable.getClass().getMethods();
+        for (Method method: methods) {
+            Structable.Field field = 
method.getAnnotation(Structable.Field.class);
+            if (field == null) {
+                continue;
+            }
+            if (method.getParameterTypes().length > 0) {
+                log.debug("Cannot invoke method with non-zero parameters.");
+                continue;
+            }
+            try {
+                Object object = method.invoke(structable);
+                if (object != null) {
+                    if (object instanceof Exception) {
+                        Exception ex = (Exception) object;
+                        object = getMessage(ex) + "\n" + getStackTrace(ex);
+                    } else if (object.getClass().isEnum()) {
+                        object = String.valueOf(object);
+                    } else if 
(Class.class.isAssignableFrom(object.getClass())) {
+                        object = ((Class<?>) object).getName();
+                    } else if (object instanceof Map) {
+                        Map<String, String> m = new HashMap<>();
+                        Map<Object, Object> mm = (Map<Object, Object>) object;
+                        for (Map.Entry<Object, Object> e: mm.entrySet()) {
+                            String key = e.getKey() instanceof String ? 
(String) e.getKey() : String.valueOf(e.getKey());
+                            String val = e.getValue() instanceof String ? 
(String) e.getValue() : String.valueOf(e.getValue());
+                            m.put(key, val);
+                        }
+                        object = m;
+                    }
+                    struct.put(field.value(), object);
+                }
+
+            } catch (Exception e) {
+                log.error("Could nto invoke method " + method, e);
+            }
+        }
+        return struct;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Schema getSchemaFor(Class<? extends Structable> structable) {
+        SchemaBuilder schemaBuilder = SchemaBuilder.struct();
+        Method[] methods = structable.getMethods();
+        for (Method method: methods) {
+            Structable.Field field = 
method.getAnnotation(Structable.Field.class);
+            if (field == null) {
+                continue;
+            }
+            if (method.getParameterTypes().length > 0) {
+                log.debug("Cannot invoke method with non-zero parameters.");
+                continue;
+            }
+
+            Class<?> type = method.getReturnType();
+            if (method.getReturnType().equals(int.class)) {
+                schemaBuilder.field(field.value(), 
Schema.OPTIONAL_INT32_SCHEMA);
+            } else if (method.getReturnType().equals(String.class)) {
+                schemaBuilder.field(field.value(), 
Schema.OPTIONAL_STRING_SCHEMA);
+            } else if (method.getReturnType().equals(long.class)) {
+                schemaBuilder.field(field.value(), 
Schema.OPTIONAL_INT64_SCHEMA);
+            } else if (method.getReturnType().equals(Exception.class)) {
+                schemaBuilder.field(field.value(), 
Schema.OPTIONAL_STRING_SCHEMA);
+            } else if (method.getReturnType().equals(Structable.class)) {
+                schemaBuilder.field(field.value(), getSchemaFor((Class<? 
extends Structable>) method.getReturnType()));
+            } else if (method.getReturnType().isEnum()) {
+                schemaBuilder.field(field.value(), 
Schema.OPTIONAL_STRING_SCHEMA);
+            } else if (Class.class.isAssignableFrom(method.getReturnType())) {
+                schemaBuilder.field(field.value(), 
Schema.OPTIONAL_STRING_SCHEMA);
+            } else if (Map.class.isAssignableFrom(method.getReturnType())) {
+                schemaBuilder.field(field.value(), 
SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, 
Schema.OPTIONAL_STRING_SCHEMA).build());
+            } else {
+                throw new DataException("Could not translate type: " + type + 
" for method " + method);
+            }
+        }
+        return schemaBuilder.build();
+    }
+
+    public static String getMessage(final Throwable th) {
+        if (th == null) {
+            return "";
+        }
+        final String clsName = th.getClass().getName();
+        final String msg = th.getMessage();
+        return clsName + ": " + msg;
+    }
+
+    public static String getStackTrace(final Throwable throwable) {
+        final StringWriter sw = new StringWriter();
+        final PrintWriter pw = new PrintWriter(sw, true);
+        throwable.printStackTrace(pw);
+        return sw.getBuffer().toString();
+    }
+
+    public static void main(String[] args) {
+        new StructUtil().test();
+    }
+
+    private Exception foo() {
+        return new ConnectException("connect");
+    }
+
+    private void test() {
+        
System.out.println(RuntimeException.class.isAssignableFrom(Exception.class));
+        
System.out.println(Exception.class.isAssignableFrom(RuntimeException.class));
+        
System.out.println(RuntimeException.class.isAssignableFrom(RuntimeException.class));
+
+        ProcessingContext context = ProcessingContext.noop("my-task");
+        context.setException(new RuntimeException("hello", foo()));
+        Struct contextStruct = toStruct(context);
+
+        Map<String, Object> config = new HashMap<>();
+        config.put("k1", "v1");
+        config.put("k2", 10);
+        Stage stage = 
Stage.newBuilder(StageType.TASK_START).setExecutingClass(SinkTask.class).setConfig(config).build();
+        Struct stageStruct = toStruct(stage);
+
+        Schema combinedSchema = SchemaBuilder.struct()
+                .field("context", contextStruct.schema())
+                .field("stage", stageStruct.schema())
+                .build();
+        Struct struct = new Struct(combinedSchema);
+        struct.put("context", contextStruct);
+        struct.put("stage", stageStruct);
+
+        System.out.println(struct.toString().replaceAll("\n", "\\\\n"));
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 9568e787cc5..791637c06c9 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -31,6 +31,8 @@
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
@@ -161,7 +163,8 @@ public void setUp() {
     private void createTask(TargetState initialState) {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics, keyConverter, valueConverter, headerConverter, transformationChain, 
pluginLoader, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics, keyConverter, valueConverter, headerConverter, transformationChain, 
pluginLoader, time,
+                ProcessingContext.noop(taskId.toString()));
     }
 
     @After
@@ -1361,7 +1364,7 @@ private void expectConversionAndTransformation(final int 
numMessages, final Stri
         EasyMock.expect(valueConverter.toConnectData(TOPIC, 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, 
VALUE)).times(numMessages);
 
         final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
-        
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)))
+        
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture), 
EasyMock.<OperationExecutor>anyObject(), 
EasyMock.<ProcessingContext>anyObject()))
                 .andAnswer(new IAnswer<SinkRecord>() {
                     @Override
                     public SinkRecord answer() {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index b09f8477f45..f5e3cfd2abc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -28,6 +28,8 @@
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -137,7 +139,8 @@ public void setup() {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics, keyConverter,
-                valueConverter, headerConverter, TransformationChain.noOp(), 
pluginLoader, time);
+                valueConverter, headerConverter, 
TransformationChain.<SinkRecord>noOp(), pluginLoader, time,
+                ProcessingContext.noop(taskId.toString()));
 
         recordsReturned = 0;
     }
@@ -573,7 +576,7 @@ private void expectStopTask() throws Exception {
         EasyMock.expect(valueConverter.toConnectData(TOPIC, 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
 
         final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
-        
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(new
 IAnswer<SinkRecord>() {
+        
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture), 
EasyMock.<OperationExecutor>anyObject(), 
EasyMock.<ProcessingContext>anyObject())).andAnswer(new IAnswer<SinkRecord>() {
             @Override
             public SinkRecord answer() {
                 return recordCapture.getValue();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 25c2cb10352..709c59a5102 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -26,6 +26,8 @@
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import 
org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -146,7 +148,8 @@ private void createWorkerTask() {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverter, valueConverter, headerConverter,
-                transformationChain, producer, offsetReader, offsetWriter, 
config, metrics, plugins.delegatingLoader(), Time.SYSTEM);
+                transformationChain, producer, offsetReader, offsetWriter, 
config, metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                ProcessingContext.noop(taskId.toString()));
     }
 
     @Test
@@ -772,7 +775,7 @@ private void expectConvertKeyValue(boolean anyTimes) {
 
     private void expectApplyTransformationChain(boolean anyTimes) {
         final Capture<SourceRecord> recordCapture = EasyMock.newCapture();
-        IExpectationSetters<SourceRecord> convertKeyExpect = 
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)));
+        IExpectationSetters<SourceRecord> convertKeyExpect = 
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture), 
EasyMock.<OperationExecutor>anyObject(), 
EasyMock.<ProcessingContext>anyObject()));
         if (anyTimes)
             convertKeyExpect.andStubAnswer(new IAnswer<SourceRecord>() {
                 @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 78f2836f42b..dc3c8b0acc7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
@@ -77,9 +78,10 @@ public void standardStartup() {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        ProcessingContext.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics, ProcessingContext.noop("test"))
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -123,9 +125,10 @@ public void stopBeforeStarting() {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        ProcessingContext.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics, ProcessingContext.noop("test"))
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -162,9 +165,10 @@ public void cancelBeforeStopping() throws Exception {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        ProcessingContext.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics, ProcessingContext.noop("test"))
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f062436f0e0..b4828dcc998 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -33,6 +33,9 @@
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -475,6 +478,8 @@ public void testAddRemoveTask() throws Exception {
         expectStartStorage();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
+        
EasyMock.expect(workerTask.processingContext()).andStubReturn(ProcessingContext.noop(TASK_ID.toString()));
+        
EasyMock.expect(workerTask.operationExecutor()).andStubReturn(NoopExecutor.INSTANCE);
 
         
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
@@ -492,7 +497,9 @@ public void testAddRemoveTask() throws Exception {
                 EasyMock.eq(config),
                 anyObject(ConnectMetrics.class),
                 anyObject(ClassLoader.class),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(ProcessingContext.class),
+                anyObject(OperationExecutor.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -612,6 +619,8 @@ public void testCleanupTasksOnStop() throws Exception {
         expectStartStorage();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
+        
EasyMock.expect(workerTask.processingContext()).andStubReturn(ProcessingContext.noop(TASK_ID.toString()));
+        
EasyMock.expect(workerTask.operationExecutor()).andStubReturn(NoopExecutor.INSTANCE);
 
         
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
@@ -629,7 +638,9 @@ public void testCleanupTasksOnStop() throws Exception {
                 anyObject(WorkerConfig.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(ProcessingContext.class),
+                anyObject(OperationExecutor.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -699,6 +710,8 @@ public void testConverterOverrides() throws Exception {
         expectStartStorage();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
+        
EasyMock.expect(workerTask.processingContext()).andStubReturn(ProcessingContext.noop(TASK_ID.toString()));
+        
EasyMock.expect(workerTask.operationExecutor()).andStubReturn(NoopExecutor.INSTANCE);
 
         Capture<TestConverter> keyConverter = EasyMock.newCapture();
         Capture<TestConfigurableConverter> valueConverter = 
EasyMock.newCapture();
@@ -720,7 +733,9 @@ public void testConverterOverrides() throws Exception {
                 anyObject(WorkerConfig.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(ProcessingContext.class),
+                anyObject(OperationExecutor.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutorTest.java
new file mode 100644
index 00000000000..44249695b2e
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutorTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors.impl;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.runtime.errors.OperationExecutor;
+import org.apache.kafka.connect.runtime.errors.ProcessingContext;
+import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.StageType;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ProcessingContext.class})
+public class RetryWithToleranceExecutorTest {
+
+    @Mock
+    private ProcessingContext processingContext;
+
+    Object ref = new Object();
+
+    @Test
+    public void testHandleExceptionInTransformations() {
+        testHandleExceptionInStage(StageType.TRANSFORMATION, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInHeaderConverter() {
+        testHandleExceptionInStage(StageType.HEADER_CONVERTER, new 
Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInValueConverter() {
+        testHandleExceptionInStage(StageType.VALUE_CONVERTER, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInKeyConverter() {
+        testHandleExceptionInStage(StageType.KEY_CONVERTER, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInKafkaConsume() {
+        testHandleExceptionInStage(StageType.KAFKA_CONSUME, new 
org.apache.kafka.common.errors.RetriableException() {});
+    }
+
+    @Test
+    public void testHandleExceptionInKafkaProduce() {
+        testHandleExceptionInStage(StageType.KAFKA_PRODUCE, new 
org.apache.kafka.common.errors.RetriableException() {});
+    }
+
+    @Test
+    public void testHandleExceptionInTaskPut() {
+        testHandleExceptionInStage(StageType.TASK_PUT, new 
org.apache.kafka.connect.errors.RetriableException("Test"));
+    }
+
+    @Test
+    public void testHandleExceptionInTaskPoll() {
+        testHandleExceptionInStage(StageType.TASK_POLL, new 
org.apache.kafka.connect.errors.RetriableException("Test"));
+    }
+
+    @Test(expected = Exception.class)
+    public void testThrowExceptionInTaskPut() {
+        testHandleExceptionInStage(StageType.TASK_PUT, new Exception());
+    }
+
+    @Test(expected = Exception.class)
+    public void testThrowExceptionInTaskPoll() {
+        testHandleExceptionInStage(StageType.TASK_POLL, new Exception());
+    }
+
+    @Test(expected = Exception.class)
+    public void testThrowExceptionInKafkaConsume() {
+        testHandleExceptionInStage(StageType.KAFKA_CONSUME, new Exception());
+    }
+
+    @Test(expected = Exception.class)
+    public void testThrowExceptionInKafkaProduce() {
+        testHandleExceptionInStage(StageType.KAFKA_PRODUCE, new Exception());
+    }
+
+    private void testHandleExceptionInStage(StageType type, Exception ex) {
+        RetryWithToleranceExecutor executor = setupExecutor();
+        setupProcessingContext(type, ex);
+        replay(processingContext);
+        assertEquals(ref, executor.execute(new ExceptionThrower(ex), ref, 
processingContext));
+        PowerMock.verifyAll();
+    }
+
+    private RetryWithToleranceExecutor setupExecutor() {
+        RetryWithToleranceExecutor executor = new RetryWithToleranceExecutor();
+        Map<String, Object> props = 
config(RetryWithToleranceExecutor.RETRIES_LIMIT, "0");
+        props.put(RetryWithToleranceExecutor.TOLERANCE_LIMIT, "10");
+        props.put(RetryWithToleranceExecutor.TOLERANCE_RATE_LIMIT, "10");
+        executor.configure(props);
+        return executor;
+    }
+
+    private void setupProcessingContext(StageType type, Exception ex) {
+        
EasyMock.expect(processingContext.current()).andReturn(Stage.newBuilder(type).build());
+        EasyMock.expect(processingContext.exception()).andReturn(ex);
+        EasyMock.expect(processingContext.attempt()).andReturn(1);
+        
EasyMock.expect(processingContext.timeOfError()).andReturn(System.currentTimeMillis());
+        processingContext.setTimeOfError(EasyMock.anyLong());
+
+        processingContext.incrementAttempt();
+        EasyMock.expectLastCall();
+
+        processingContext.report();
+        EasyMock.expectLastCall();
+
+        processingContext.setException(ex);
+        EasyMock.expectLastCall();
+    }
+
+    @Test
+    public void testCheckRetryLimit() {
+        RetryWithToleranceExecutor executor = new RetryWithToleranceExecutor();
+        Map<String, Object> props = 
config(RetryWithToleranceExecutor.RETRIES_LIMIT, "5");
+        props.put(RetryWithToleranceExecutor.RETRIES_DELAY_MAX_MS, "120000");
+        executor.configure(props);
+
+        EasyMock.expect(processingContext.attempt()).andReturn(1);
+        EasyMock.expect(processingContext.attempt()).andReturn(2);
+        EasyMock.expect(processingContext.attempt()).andReturn(3);
+        EasyMock.expect(processingContext.attempt()).andReturn(4);
+        EasyMock.expect(processingContext.attempt()).andReturn(5);
+        EasyMock.expect(processingContext.attempt()).andReturn(6);
+
+        replay(processingContext);
+
+        assertTrue(executor.checkRetry(processingContext));
+        assertTrue(executor.checkRetry(processingContext));
+        assertTrue(executor.checkRetry(processingContext));
+        assertTrue(executor.checkRetry(processingContext));
+        assertTrue(executor.checkRetry(processingContext));
+        assertFalse(executor.checkRetry(processingContext));
+    }
+
+    @Test
+    public void testBackoffLimit() {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceExecutor executor = new 
RetryWithToleranceExecutor(time);
+
+        Map<String, Object> props = 
config(RetryWithToleranceExecutor.RETRIES_LIMIT, "5");
+        props.put(RetryWithToleranceExecutor.RETRIES_DELAY_MAX_MS, "10000");
+        executor.configure(props);
+
+        EasyMock.expect(processingContext.attempt()).andReturn(1);
+        EasyMock.expect(processingContext.attempt()).andReturn(2);
+        EasyMock.expect(processingContext.attempt()).andReturn(3);
+        EasyMock.expect(processingContext.attempt()).andReturn(4);
+        EasyMock.expect(processingContext.attempt()).andReturn(5);
+
+        replay(processingContext);
+
+        long prevTs = time.hiResClockMs();
+        executor.backoff(processingContext);
+        assertEquals(time.hiResClockMs() - prevTs, 1000);
+
+        prevTs = time.hiResClockMs();
+        executor.backoff(processingContext);
+        assertEquals(time.hiResClockMs() - prevTs, 2000);
+
+        prevTs = time.hiResClockMs();
+        executor.backoff(processingContext);
+        assertEquals(time.hiResClockMs() - prevTs, 4000);
+
+        prevTs = time.hiResClockMs();
+        executor.backoff(processingContext);
+        assertEquals(time.hiResClockMs() - prevTs, 8000);
+
+        prevTs = time.hiResClockMs();
+        executor.backoff(processingContext);
+        assertTrue(time.hiResClockMs() - prevTs < 10000);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testToleranceLimitWithinDuration() {
+        RetryWithToleranceExecutor executor = new 
RetryWithToleranceExecutor(new MockTime(0, 0, 0));
+        Map<String, Object> props = 
config(RetryWithToleranceExecutor.TOLERANCE_RATE_LIMIT, "1");
+        props.put(RetryWithToleranceExecutor.TOLERANCE_RATE_DURATION, 
"minute");
+        executor.configure(props);
+
+        long errorTimeMs = 1;
+        
EasyMock.expect(processingContext.timeOfError()).andReturn(errorTimeMs);
+        EasyMock.expect(processingContext.timeOfError()).andReturn(errorTimeMs 
+ 1);
+
+        replay(processingContext);
+
+        assertTrue(executor.withinToleranceLimits(processingContext));
+        assertFalse(executor.withinToleranceLimits(processingContext));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testToleranceLimitAcrossDuration() {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceExecutor executor = new 
RetryWithToleranceExecutor(time);
+        Map<String, Object> props = 
config(RetryWithToleranceExecutor.TOLERANCE_RATE_LIMIT, "1");
+        props.put(RetryWithToleranceExecutor.TOLERANCE_RATE_DURATION, 
"minute");
+        executor.configure(props);
+
+        // first error occurs at 59 seconds
+        long errorTimeMs = 59000;
+        
EasyMock.expect(processingContext.timeOfError()).andReturn(errorTimeMs);
+        // second error occurs at 61 seconds
+        errorTimeMs = 61000;
+        
EasyMock.expect(processingContext.timeOfError()).andReturn(errorTimeMs);
+
+        replay(processingContext);
+
+        assertTrue(executor.withinToleranceLimits(processingContext));
+
+        time.setCurrentTimeMs(60000);
+
+        assertTrue(executor.withinToleranceLimits(processingContext));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDefaultConfigs() {
+        RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig 
configuration;
+        configuration = new 
RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(new HashMap<>());
+        assertEquals(configuration.retriesLimit(), 0);
+        assertEquals(configuration.retriesDelayMax(), 60000);
+        assertEquals(configuration.toleranceLimit(), 0);
+        assertEquals(configuration.toleranceRateLimit(), 0);
+        assertEquals(configuration.toleranceRateDuration(), TimeUnit.MINUTES);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConfigs() {
+        RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig 
configuration;
+        configuration = new 
RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("retries.limit",
 "100"));
+        assertEquals(configuration.retriesLimit(), 100);
+
+        configuration = new 
RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("retries.delay.max.ms",
 "100"));
+        assertEquals(configuration.retriesDelayMax(), 100);
+
+        configuration = new 
RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.limit",
 "1024"));
+        assertEquals(configuration.toleranceLimit(), 1024);
+
+        configuration = new 
RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.rate.limit",
 "125"));
+        assertEquals(configuration.toleranceRateLimit(), 125);
+
+        configuration = new 
RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.rate.duration",
 "minute"));
+        assertEquals(configuration.toleranceRateDuration(), TimeUnit.MINUTES);
+
+        configuration = new 
RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.rate.duration",
 "day"));
+        assertEquals(configuration.toleranceRateDuration(), TimeUnit.DAYS);
+
+        configuration = new 
RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.rate.duration",
 "hour"));
+        assertEquals(configuration.toleranceRateDuration(), TimeUnit.HOURS);
+
+        PowerMock.verifyAll();
+    }
+
+    Map<String, Object> config(String key, Object val) {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(key, val);
+        return configs;
+    }
+
+    private static class ExceptionThrower implements 
OperationExecutor.Operation {
+        private Exception e;
+
+        public ExceptionThrower(Exception e) {
+            this.e = e;
+        }
+
+        @Override
+        public Object apply() throws Exception {
+            throw e;
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect handling of bad data
> ----------------------------------
>
>                 Key: KAFKA-6738
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6738
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 1.1.0
>            Reporter: Randall Hauch
>            Assignee: Arjun Satish
>            Priority: Critical
>             Fix For: 2.0.0
>
>
> Kafka Connect connectors and tasks fail when they run into an unexpected 
> situation or error, but the framework should provide more general "bad data 
> handling" options, including (perhaps among others):
> # fail fast, which is what we do today (assuming connector actually fails and 
> doesn't eat errors)
> # retry (possibly with configs to limit)
> # drop data and move on
> # dead letter queue
> This needs to be addressed in a way that handles errors from:
> # The connector itself (e.g. connectivity issues to the other system)
> # Converters/serializers (bad data, unexpected format, etc)
> # SMTs
> # Ideally the framework as well, though we obviously want to fix known bugs 
> anyway



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to