[ https://issues.apache.org/jira/browse/KAFKA-6738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488234#comment-16488234 ]
ASF GitHub Bot commented on KAFKA-6738: --------------------------------------- wicknicks closed pull request #5010: KAFKA-6738: (WIP) Error Handling in Connect URL: https://github.com/apache/kafka/pull/5010 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 64258bf7b07..4f31ce264c7 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -80,7 +80,7 @@ files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/> <suppress checks="ParameterNumber" - files="WorkerSourceTask.java"/> + files="(WorkerSourceTask|WorkerSinkTask).java"/> <suppress checks="ParameterNumber" files="WorkerCoordinator.java"/> <suppress checks="ParameterNumber" diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 6a90310df5a..f8c02a08547 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.runtime.errors.StageType; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.transforms.Transformation; @@ -91,6 +93,8 @@ private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records."; private static final String TRANSFORMS_DISPLAY = "Transforms"; + public static final String ERROR_HANDLING_CONFIG = "errors"; + private final EnrichedConnectorConfig enrichedConfig; private static class EnrichedConnectorConfig extends AbstractConfig { EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) { @@ -139,6 +143,13 @@ public ConnectorConfig(Plugins plugins, ConfigDef configDef, Map<String, String> ); } + /** + * @return properties to configure error handlers and reporters. + */ + public Map<String, ?> errorHandlerConfig() { + return originalsWithPrefix(ERROR_HANDLING_CONFIG + "."); + } + @Override public Object get(String key) { return enrichedConfig.get(key); @@ -166,6 +177,25 @@ public Object get(String key) { return transformations; } + /** + * @return an ordered list of stages describing the transformations in this connector. The order is specified by + * {@link #TRANSFORMS_CONFIG}. + */ + public List<Stage> transformationAsStages() { + final List<String> transformAliases = getList(TRANSFORMS_CONFIG); + List<Stage> stages = new ArrayList<>(); + for (String alias : transformAliases) { + final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; + stages.add(Stage.newBuilder(StageType.TRANSFORMATION) + .setExecutingClass(getClass(prefix + "type")) + .setConfig(originalsWithPrefix(prefix)) + .build() + ); + } + + return stages; + } + /** * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input. * <p> diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java index e1d8b1f2e85..3d9baf3e541 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java @@ -17,6 +17,9 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor; import org.apache.kafka.connect.transforms.Transformation; import java.util.Collections; @@ -32,10 +35,29 @@ public TransformationChain(List<Transformation<R>> transformations) { } public R apply(R record) { + return apply(record, NoopExecutor.INSTANCE, null); + } + + public R apply(R record, OperationExecutor operationExecutor, ProcessingContext processingContext) { if (transformations.isEmpty()) return record; - for (Transformation<R> transformation : transformations) { - record = transformation.apply(record); + for (final Transformation<R> transformation : transformations) { + final R current = record; + + // set the current record + processingContext.setRecord(current); + + // execute the operation + record = operationExecutor.execute(new OperationExecutor.Operation<R>() { + @Override + public R apply() { + return transformation.apply(current); + } + }, null, processingContext); + + // move to the next stage + processingContext.next(); + if (record == null) break; } @@ -62,7 +84,7 @@ public int hashCode() { } public static <R extends ConnectRecord<R>> TransformationChain<R> noOp() { - return new TransformationChain<R>(Collections.<Transformation<R>>emptyList()); + return new TransformationChain<>(Collections.<Transformation<R>>emptyList()); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 1c6465855ff..dd560e28467 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -30,6 +30,14 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.runtime.errors.StageType; +import org.apache.kafka.connect.runtime.errors.impl.DLQReporter; +import org.apache.kafka.connect.runtime.errors.impl.ReporterFactory; +import org.apache.kafka.connect.runtime.errors.impl.RetryWithToleranceExecutor; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.sink.SinkRecord; @@ -364,6 +372,8 @@ public boolean startTask( if (tasks.containsKey(id)) throw new ConnectException("Task already exists in this worker: " + id); + ProcessingContext.Builder prContextBuilder = ProcessingContext.newBuilder(id, config.originals()); + final WorkerTask workerTask; ClassLoader savedLoader = plugins.currentThreadLoader(); try { @@ -395,27 +405,50 @@ public boolean startTask( WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER ); + + Stage.Builder keyConverterStage = Stage.newBuilder(StageType.KEY_CONVERTER); + Stage.Builder valueConverterStage = Stage.newBuilder(StageType.VALUE_CONVERTER); + Stage.Builder headerConverterStage = Stage.newBuilder(StageType.HEADER_CONVERTER); + if (keyConverter == null) { keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + keyConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id); } else { + keyConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id); } + keyConverterStage.setExecutingClass(keyConverter.getClass()); + if (valueConverter == null) { valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + valueConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id); } else { + valueConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id); } + valueConverterStage.setExecutingClass(valueConverter.getClass()); + if (headerConverter == null) { headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + headerConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id); } else { + headerConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id); } + headerConverterStage.setExecutingClass(headerConverter.getClass()); + + prContextBuilder.appendStage(keyConverterStage.build()); + prContextBuilder.appendStage(valueConverterStage.build()); + prContextBuilder.appendStage(headerConverterStage.build()); - workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader); + prContextBuilder.addReporters(errorReportersConfig(connConfig)); + + workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader, prContextBuilder, new RetryWithToleranceExecutor()); workerTask.initialize(taskConfig); + workerTask.operationExecutor().configure(connConfig.originalsWithPrefix("errors.")); Plugins.compareAndSwapLoaders(savedLoader); } catch (Throwable t) { log.error("Failed to start task {}", id, t); @@ -447,7 +480,9 @@ private WorkerTask buildWorkerTask(ConnectorConfig connConfig, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, - ClassLoader loader) { + ClassLoader loader, + ProcessingContext.Builder prContextBuilder, + OperationExecutor operationExecutor) { // Decide which type of worker task we need based on the type of task. if (task instanceof SourceTask) { TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations()); @@ -456,18 +491,55 @@ private WorkerTask buildWorkerTask(ConnectorConfig connConfig, OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), internalKeyConverter, internalValueConverter); KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + for (Stage stage: connConfig.transformationAsStages()) { + prContextBuilder.prependStage(stage); + } + + prContextBuilder.prependStage(Stage.newBuilder(StageType.TASK_POLL) + .setConfig(connConfig.originals()) + .setExecutingClass(task.getClass()) + .build() + ); + + prContextBuilder.appendStage(Stage.newBuilder(StageType.KAFKA_PRODUCE).build()); + return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, - headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time); + headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time, prContextBuilder.build(), operationExecutor); } else if (task instanceof SinkTask) { TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations()); + + for (Stage stage: connConfig.transformationAsStages()) { + prContextBuilder.appendStage(stage); + } + + prContextBuilder.appendStage(Stage.newBuilder(StageType.TASK_POLL) + .setConfig(connConfig.originals()) + .setExecutingClass(task.getClass()) + .build() + ); + + prContextBuilder.prependStage(Stage.newBuilder(StageType.KAFKA_CONSUME).build()); + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter, - valueConverter, headerConverter, transformationChain, loader, time); + valueConverter, headerConverter, transformationChain, loader, time, prContextBuilder.build(), operationExecutor); + } else { log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask"); } } + /** + * Setup properties for dead letter queue using the producer properties from worker config before overriding them + * from the connector config. + * @param connConfig connector config + * @return reporters to log error context + */ + private List<ErrorReporter> errorReportersConfig(ConnectorConfig connConfig) { + return new ReporterFactory().forConfig(producerProps, connConfig); + } + private void stopTask(ConnectorTaskId taskId) { WorkerTask task = tasks.get(taskId); if (task == null) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 2ba785c4668..a8945231ed3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -40,6 +40,10 @@ import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.StageType; +import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; @@ -100,8 +104,26 @@ public WorkerSinkTask(ConnectorTaskId id, HeaderConverter headerConverter, TransformationChain<SinkRecord> transformationChain, ClassLoader loader, - Time time) { - super(id, statusListener, initialState, loader, connectMetrics); + Time time, + ProcessingContext processingContext) { + this(id, task, statusListener, initialState, workerConfig, connectMetrics, keyConverter, valueConverter, headerConverter, transformationChain, loader, time, processingContext, NoopExecutor.INSTANCE); + } + + public WorkerSinkTask(ConnectorTaskId id, + SinkTask task, + TaskStatus.Listener statusListener, + TargetState initialState, + WorkerConfig workerConfig, + ConnectMetrics connectMetrics, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + TransformationChain<SinkRecord> transformationChain, + ClassLoader loader, + Time time, + ProcessingContext processingContext, + OperationExecutor operationExecutor) { + super(id, statusListener, initialState, loader, connectMetrics, processingContext, operationExecutor); this.workerConfig = workerConfig; this.task = task; @@ -294,9 +316,16 @@ protected void poll(long timeoutMs) { } log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs); - ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs); + final long minTimeout = timeoutMs; + processingContext().position(StageType.KAFKA_CONSUME); + ConsumerRecords<byte[], byte[]> msgs = operationExecutor().execute(new OperationExecutor.Operation<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> apply() { + return pollConsumer(minTimeout); + } + }, ConsumerRecords.<byte[], byte[]>empty(), processingContext()); assert messageBatch.isEmpty() || msgs.isEmpty(); - log.trace("{} Polling returned {} messages", this, msgs.count()); + log.info("{} Polling returned {} messages", this, msgs.count()); convertMessages(msgs); deliverMessages(); @@ -461,12 +490,39 @@ public String toString() { private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { origOffsets.clear(); - for (ConsumerRecord<byte[], byte[]> msg : msgs) { + for (final ConsumerRecord<byte[], byte[]> msg : msgs) { log.trace("{} Consuming and converting message in topic '{}' partition {} at offset {} and timestamp {}", this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp()); - SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key()); - SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value()); - Headers headers = convertHeadersFor(msg); + + processingContext().position(StageType.KEY_CONVERTER); + SchemaAndValue keyAndSchema = operationExecutor().execute(new OperationExecutor.Operation<SchemaAndValue>() { + @Override + public SchemaAndValue apply() { + return keyConverter.toConnectData(msg.topic(), msg.key()); + } + }, DEFAULT_SCHEMA_AND_VALUE, processingContext()); + + processingContext().position(StageType.VALUE_CONVERTER); + SchemaAndValue valueAndSchema = operationExecutor().execute(new OperationExecutor.Operation<SchemaAndValue>() { + @Override + public SchemaAndValue apply() { + return valueConverter.toConnectData(msg.topic(), msg.value()); + } + }, DEFAULT_SCHEMA_AND_VALUE, processingContext()); + + processingContext().position(StageType.HEADER_CONVERTER); + Headers headers = operationExecutor().execute(new OperationExecutor.Operation<Headers>() { + @Override + public Headers apply() { + return convertHeadersFor(msg); + } + }, DEFAULT_HEADERS, processingContext()); + + if (keyAndSchema == DEFAULT_SCHEMA_AND_VALUE || valueAndSchema == DEFAULT_SCHEMA_AND_VALUE + || headers == DEFAULT_HEADERS) { + continue; + } + Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp()); SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), @@ -475,9 +531,12 @@ private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { timestamp, msg.timestampType(), headers); + log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value()); - SinkRecord transRecord = transformationChain.apply(origRecord); + processingContext().position(StageType.TRANSFORMATION); + SinkRecord transRecord = transformationChain.apply(origRecord, operationExecutor(), processingContext()); + origOffsets.put( new TopicPartition(origRecord.topic(), origRecord.kafkaPartition()), new OffsetAndMetadata(origRecord.kafkaOffset() + 1) @@ -521,7 +580,15 @@ private void deliverMessages() { // Since we reuse the messageBatch buffer, ensure we give the task its own copy log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size()); long start = time.milliseconds(); - task.put(new ArrayList<>(messageBatch)); + processingContext().position(StageType.TASK_PUT); + operationExecutor().execute(new OperationExecutor.Operation<Object>() { + @Override + public Object apply() { + task.put(new ArrayList<>(messageBatch)); + return null; + } + }, processingContext()); + recordBatch(messageBatch.size()); sinkTaskMetricsGroup.recordPut(time.milliseconds() - start); currentOffsets.putAll(origOffsets); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index f2cef5a63f7..6d956dd0f0e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -34,6 +34,10 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.StageType; +import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; @@ -102,8 +106,30 @@ public WorkerSourceTask(ConnectorTaskId id, WorkerConfig workerConfig, ConnectMetrics connectMetrics, ClassLoader loader, - Time time) { - super(id, statusListener, initialState, loader, connectMetrics); + Time time, + ProcessingContext processingContext) { + this(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, + producer, offsetReader, offsetWriter, workerConfig, connectMetrics, loader, time, processingContext, NoopExecutor.INSTANCE); + } + + public WorkerSourceTask(ConnectorTaskId id, + SourceTask task, + TaskStatus.Listener statusListener, + TargetState initialState, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + TransformationChain<SourceRecord> transformationChain, + KafkaProducer<byte[], byte[]> producer, + OffsetStorageReader offsetReader, + OffsetStorageWriter offsetWriter, + WorkerConfig workerConfig, + ConnectMetrics connectMetrics, + ClassLoader loader, + Time time, + ProcessingContext processingContext, + OperationExecutor operationExecutor) { + super(id, statusListener, initialState, loader, connectMetrics, processingContext, operationExecutor); this.workerConfig = workerConfig; this.task = task; @@ -184,7 +210,13 @@ public void execute() { if (toSend == null) { log.trace("{} Nothing to send to Kafka. Polling source for additional records", this); long start = time.milliseconds(); - toSend = task.poll(); + processingContext().position(StageType.TASK_POLL); + toSend = operationExecutor().execute(new OperationExecutor.Operation<List<SourceRecord>>() { + @Override + public List<SourceRecord> apply() throws InterruptedException { + return task.poll(); + } + }, processingContext()); if (toSend != null) { recordPollReturned(toSend.size(), time.milliseconds() - start); } @@ -216,7 +248,8 @@ private boolean sendRecords() { recordBatch(toSend.size()); final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup); for (final SourceRecord preTransformRecord : toSend) { - final SourceRecord record = transformationChain.apply(preTransformRecord); + processingContext().position(StageType.TRANSFORMATION); + final SourceRecord record = transformationChain.apply(preTransformRecord, operationExecutor(), processingContext()); if (record == null) { counter.skipRecord(); @@ -224,9 +257,32 @@ private boolean sendRecords() { continue; } - RecordHeaders headers = convertHeaderFor(record); - byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()); - byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); + processingContext().setRecord(record); + + processingContext().position(StageType.HEADER_CONVERTER); + RecordHeaders headers = operationExecutor().execute(new OperationExecutor.Operation<RecordHeaders>() { + @Override + public RecordHeaders apply() { + return convertHeaderFor(record); + } + }, DEFAULT_RECORD_HEADERS, processingContext()); + + processingContext().position(StageType.KEY_CONVERTER); + byte[] key = operationExecutor().execute(new OperationExecutor.Operation<byte[]>() { + @Override + public byte[] apply() { + return keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()); + } + }, processingContext()); + + processingContext().position(StageType.VALUE_CONVERTER); + byte[] value = operationExecutor().execute(new OperationExecutor.Operation<byte[]>() { + @Override + public byte[] apply() { + return valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); + } + }, processingContext()); + final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers); log.trace("{} Appending record with key {}, value {}", this, record.key(), record.value()); @@ -247,30 +303,37 @@ private boolean sendRecords() { } try { final String topic = producerRecord.topic(); - producer.send( - producerRecord, - new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { - // Given the default settings for zero data loss, this should basically never happen -- - // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request - // timeouts, callbacks with exceptions should never be invoked in practice. If the - // user overrode these settings, the best we can do is notify them of the failure via - // logging. - log.error("{} failed to send record to {}: {}", this, topic, e); - log.debug("{} Failed record: {}", this, preTransformRecord); - } else { - log.trace("{} Wrote record successfully: topic {} partition {} offset {}", - this, - recordMetadata.topic(), recordMetadata.partition(), - recordMetadata.offset()); - commitTaskRecord(preTransformRecord); - } - recordSent(producerRecord); - counter.completeRecord(); - } - }); + processingContext().position(StageType.KAFKA_PRODUCE); + processingContext().setRecord(record); + operationExecutor().execute(new OperationExecutor.Operation<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> apply() { + return producer.send( + producerRecord, + new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + // Given the default settings for zero data loss, this should basically never happen -- + // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request + // timeouts, callbacks with exceptions should never be invoked in practice. If the + // user overrode these settings, the best we can do is notify them of the failure via + // logging. + log.error("{} failed to send record to {}: {}", this, topic, e); + log.debug("{} Failed record: {}", this, preTransformRecord); + } else { + log.trace("{} Wrote record successfully: topic {} partition {} offset {}", + this, + recordMetadata.topic(), recordMetadata.partition(), + recordMetadata.offset()); + commitTaskRecord(preTransformRecord); + } + recordSent(producerRecord); + counter.completeRecord(); + } + }); + } + }, processingContext()); lastSendFailed = false; } catch (RetriableException e) { log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index d563f9bdede..ac8ea7faf0b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Sensor; @@ -25,9 +26,16 @@ import org.apache.kafka.common.metrics.stats.Frequencies; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.AbstractStatus.State; import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; @@ -60,11 +68,30 @@ private volatile boolean stopping; // indicates whether the Worker has asked the task to stop private volatile boolean cancelled; // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown) + private final OperationExecutor operationExecutor; + private final ProcessingContext processingContext; + + protected static final SchemaAndValue DEFAULT_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false); + protected static final Headers DEFAULT_HEADERS = new ConnectHeaders(); + protected static final RecordHeaders DEFAULT_RECORD_HEADERS = new RecordHeaders(); + + // Available for testing + public WorkerTask(ConnectorTaskId id, + TaskStatus.Listener statusListener, + TargetState initialState, + ClassLoader loader, + ConnectMetrics connectMetrics, + ProcessingContext processingContext) { + this(id, statusListener, initialState, loader, connectMetrics, processingContext, NoopExecutor.INSTANCE); + } + public WorkerTask(ConnectorTaskId id, TaskStatus.Listener statusListener, TargetState initialState, ClassLoader loader, - ConnectMetrics connectMetrics) { + ConnectMetrics connectMetrics, + ProcessingContext processingContext, + OperationExecutor operationExecutor) { this.id = id; this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener); this.statusListener = taskMetricsGroup; @@ -73,6 +100,8 @@ public WorkerTask(ConnectorTaskId id, this.stopping = false; this.cancelled = false; this.taskMetricsGroup.recordState(this.targetState); + this.processingContext = processingContext; + this.operationExecutor = operationExecutor; } public ConnectorTaskId id() { @@ -100,6 +129,14 @@ private void triggerStop() { } } + public OperationExecutor operationExecutor() { + return operationExecutor; + } + + public ProcessingContext processingContext() { + return processingContext; + } + /** * Stop this task from processing messages. This method does not block, it only triggers * shutdown. Use #{@link #awaitStop} to block until completion. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java new file mode 100644 index 00000000000..a3625db6043 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.common.Configurable; + +public abstract class ErrorReporter implements Configurable { + + public void initialize() { + } + + public abstract void report(ProcessingContext report); + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/OperationExecutor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/OperationExecutor.java new file mode 100644 index 00000000000..9cdd9b39347 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/OperationExecutor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.common.Configurable; + +import java.util.Map; + +/** + * Execute a connect worker operation with configurable retries and tolerances. An operation here means applying tranformations + * on a ConnectRecord, converting it to a format used by the Connect framework, writing it to a sink or reading from a source. + * + * <p/>The following failures will be retried: + * + * <ol> + * <li> Connector tasks fail with RetriableException when reading/writing records from/to external system. + * <li> Transformations fail and throw exceptions while processing records. + * <li> Converters fail to correctly serialize/deserialize records. + * <li> Exceptions while producing/consuming to/from Kafka topics in the Connect framework. + * </ol> + * + * The executor will report errors on failure (after reattempting). It will throw a {@link ToleranceExceededException} if + * any of the tolerance limits are crossed. + */ +public abstract class OperationExecutor implements Configurable { + + public <V> V execute(Operation<V> operation, ProcessingContext context) { + return execute(operation, null, context); + } + + /** + * Configure this class with the given key-value pairs + */ + @Override + public void configure(Map<String, ?> configs) { + } + + public abstract <V> V execute(Operation<V> operation, V value, ProcessingContext context); + + public interface Operation<V> { + V apply() throws Exception; + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java new file mode 100644 index 00000000000..060d98daf54 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.common.utils.Base64; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.errors.impl.StructUtil; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.util.ConnectorTaskId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * This object will contain all the runtime context for an error which occurs in the Connect framework while + * processing a record. + */ +public class ProcessingContext implements Structable { + + private final String taskId; + private final Map<String, Object> workerConfig; + private final List<Stage> stages; + private final ErrorReporter[] reporters; + + private Exception exception; + private int current = 0; + private int attempt; + private ConnectRecord record; + private long timetstamp; + + private JsonConverter valueConverter = new JsonConverter(); + + public static Builder newBuilder(ConnectorTaskId taskId, Map<String, Object> workerConfig) { + Objects.requireNonNull(taskId); + Objects.requireNonNull(workerConfig); + + return new Builder(taskId.toString(), workerConfig); + } + + // VisibleForTesting + public static ProcessingContext noop(String taskId) { + return new ProcessingContext(taskId, Collections.<String, Object>emptyMap(), Collections.<Stage>emptyList(), new ErrorReporter[]{}); + } + + private ProcessingContext(String taskId, Map<String, Object> workerConfig, List<Stage> stages, ErrorReporter[] reporters) { + Objects.requireNonNull(taskId); + Objects.requireNonNull(workerConfig); + Objects.requireNonNull(stages); + Objects.requireNonNull(reporters); + + this.taskId = taskId; + this.workerConfig = workerConfig; + this.stages = stages; + this.reporters = reporters; + + Map<String, Object> config = new HashMap<>(); + config.put("schemas.enable", false); + config.put("converter.type", "value"); + valueConverter.configure(config); + } + + /** + * @return the configuration of the Connect worker + */ + public Map<String, Object> workerConfig() { + return workerConfig; + } + + /** + * @return which task reported this error + */ + @Field("task_id") + public String taskId() { + return taskId; + } + + /** + * @return an ordered list of stages. Connect will start with executing stage 0 and then move up the list. + */ + public List<Stage> stages() { + return stages; + } + + public Stage current() { + return stages.get(current); + } + + /** + * @return at what stage did this operation fail (0 indicates first stage) + */ + @Field("index") + public int index() { + return current; + } + + /** + * @return which attempt was this (first error will be 0) + */ + @Field("attempt") + public int attempt() { + return attempt; + } + + /** + * @return the (epoch) time of failure + */ + @Field("time_of_error") + public long timeOfError() { + return timetstamp; + } + + public void setTimeOfError(long timetstamp) { + this.timetstamp = timetstamp; + } + + /** + * The exception accompanying this failure (if any) + */ + @Field("exception") + public Exception exception() { + return exception; + } + + /** + * @return the record which when input the current stage caused the failure. + */ + public ConnectRecord record() { + return record; + } + + public void setRecord(ConnectRecord record) { + this.record = record; + } + + public void setException(Exception ex) { + this.exception = ex; + } + + public void report() { + for (ErrorReporter reporter : reporters) { + reporter.report(this); + } + } + + @Override + public Struct toStruct() { + Struct metadata = StructUtil.toStruct(this); + + Schema valueSchema = new SchemaBuilder(Schema.Type.STRUCT) + .field("schema", Schema.STRING_SCHEMA) + .field("object", Schema.STRING_SCHEMA).build(); + Struct value = new Struct(valueSchema); + value.put("schema", record.valueSchema().type().toString()); + value.put("object", Base64.encoder().encodeToString((byte[]) record.value())); + + SchemaBuilder recordSchema = new SchemaBuilder(Schema.Type.STRUCT); + recordSchema.field("topic", Schema.STRING_SCHEMA); + recordSchema.field("timestamp", Schema.INT64_SCHEMA); + recordSchema.field("offset", Schema.INT64_SCHEMA); + recordSchema.field("partition", Schema.INT32_SCHEMA); + recordSchema.field("value", value.schema()); + + Struct recordStruct = new Struct(recordSchema); + recordStruct.put("topic", record().topic()); + recordStruct.put("timestamp", record().timestamp()); + recordStruct.put("partition", record().kafkaPartition()); + recordStruct.put("offset", ((SinkRecord) record()).kafkaOffset()); + recordStruct.put("value", value); + + Struct stage = StructUtil.toStruct(current()); + + SchemaBuilder finalSchemaBuilder = new SchemaBuilder(Schema.Type.STRUCT) + .field("record", recordSchema) + .field("stage", stage.schema()); + for (org.apache.kafka.connect.data.Field f : metadata.schema().fields()) { + finalSchemaBuilder.field(f.name(), f.schema()); + } + + finalSchemaBuilder.field("stages", SchemaBuilder.array(stage.schema()).build()); + Struct struct = new Struct(finalSchemaBuilder.build()); + + for (org.apache.kafka.connect.data.Field f : metadata.schema().fields()) { + struct.put(f.name(), metadata.get(f)); + } + struct.put("record", recordStruct); + struct.put("stage", stage); + + List<Struct> ll = new ArrayList<>(); + for (Stage st : stages()) { + ll.add(StructUtil.toStruct(st)); + } + struct.put("stages", ll); + + return struct; + } + + public void reset() { + current = 0; + attempt = 0; + } + + /** + * Position index to the first stage of the given type + * + * @param type the given type + */ + public void position(StageType type) { + reset(); + for (int i = 0; i < stages.size(); i++) { + if (type == stages.get(i).type()) { + current = i; + break; + } + } + } + + public void next() { + current++; + } + + public void incrementAttempt() { + attempt++; + } + + public static class Builder { + private final Map<String, Object> workerConfig; + private final String taskId; + + private List<ErrorReporter> reporters = new ArrayList<>(); + private LinkedList<Stage> stages = new LinkedList<>(); + + private Builder(String taskId, Map<String, Object> workerConfig) { + this.taskId = taskId; + this.workerConfig = workerConfig; + } + + public Builder prependStage(Stage stage) { + stages.addFirst(stage); + return this; + } + + public Builder appendStage(Stage stage) { + stages.addLast(stage); + return this; + } + + public Builder addReporters(Collection<ErrorReporter> reporters) { + this.reporters.addAll(reporters); + return this; + } + + public ProcessingContext build() { + return new ProcessingContext(taskId, workerConfig, stages, reporters.toArray(new ErrorReporter[0])); + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java new file mode 100644 index 00000000000..596085e8b41 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.connect.data.Struct; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +public class Stage implements Structable { + + private final StageType type; + private final Map<String, Object> config; + private final Class<?> klass; + + public static Builder newBuilder(StageType type) { + return new Builder(type); + } + + private Stage(StageType type, Map<String, Object> config, Class<?> klass) { + this.type = type; + this.config = config; + this.klass = klass; + } + + /** + * @return at what stage in processing did the error happen + */ + @Field("type") + public StageType type() { + return type; + } + + /** + * @return name of the class executing this stage. + */ + @Field("class") + public Class<?> executingClass() { + return klass; + } + + /** + * @return properties used to configure this stage + */ + @Field("config") + public Map<String, Object> config() { + return config; + } + + @Override + public Struct toStruct() { + return null; + } + + @Override + public String toString() { + return "Stage{" + toStruct() + "}"; + } + + public static class Builder { + private StageType type; + private Map<String, Object> config; + private Class<?> klass; + + private Builder(StageType type) { + this.type = type; + } + + public Builder setExecutingClass(Class<?> klass) { + this.klass = klass; + return this; + } + + public Builder setConfig(Map<String, Object> config) { + this.config = config; + return this; + } + + public Stage build() { + Objects.requireNonNull(type); + return new Stage(type, config == null ? Collections.<String, Object>emptyMap() : config, klass); + } + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/StageType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/StageType.java new file mode 100644 index 00000000000..22f25baa41d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/StageType.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +/** + * A logical stage in a Connect pipeline + */ +public enum StageType { + + /** + * When the task starts up + */ + TASK_START, + + /** + * when running any transform operation on a record + */ + TRANSFORMATION, + + /** + * when calling the poll() method on a SourceConnector + */ + TASK_POLL, + + /** + * when calling the put() method on a SinkConnector + */ + TASK_PUT, + + /** + * when using the key converter to serialize/deserialize keys in ConnectRecords + */ + KEY_CONVERTER, + + /** + * when using the value converter to serialize/deserialize values in ConnectRecords + */ + VALUE_CONVERTER, + + /** + * when using the header converter to serialize/deserialize headers in ConnectRecords + */ + HEADER_CONVERTER, + + /** + * when the worker is committing offsets for the task + */ + COMMIT_OFFSETS, + + /** + * When the task is shutting down + */ + TASK_CLOSE, + + /** + * Producing to Kafka topic + */ + KAFKA_PRODUCE, + + /** + * Consuming from a Kafka topic + */ + KAFKA_CONSUME +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Structable.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Structable.java new file mode 100644 index 00000000000..03a88a8fb8c --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Structable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.connect.data.Struct; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.METHOD; + +public interface Structable { + + /** + * @return a {@link Struct} representation of this object + */ + Struct toStruct(); + + @Target({METHOD}) + @Retention(RetentionPolicy.RUNTIME) + @interface Field { + String value(); + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceExceededException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceExceededException.java new file mode 100644 index 00000000000..b50bd606031 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceExceededException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.connect.errors.ConnectException; + +public class ToleranceExceededException extends ConnectException { + + public ToleranceExceededException(String msg, Throwable throwable) { + super(msg, throwable); + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/DLQReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/DLQReporter.java new file mode 100644 index 00000000000..2f99715978a --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/DLQReporter.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors.impl; + +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; + +public class DLQReporter extends ErrorReporter { + + private static final Logger log = LoggerFactory.getLogger(DLQReporter.class); + + public static final String DLQ_TOPIC_NAME = "errors.dlq.topic.name"; + public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic where these messages are written to."; + + public static final String DLQ_TOPIC_PARTITIONS = "errors.dlq.topic.partitions"; + public static final String DLQ_TOPIC_PARTITIONS_DOC = "Number of partitions for the DLQ topic."; + public static final int DLQ_TOPIC_PARTITIONS_DEFAULT = 25; + + public static final String DLQ_TOPIC_REPLICATION_FACTOR = "errors.dlq.topic.replication.factor"; + public static final String DLQ_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the DLQ topic."; + public static final short DLQ_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; + + public static final String DLQ_INCLUDE_CONFIGS = "dlq.include.configs"; + public static final String DLQ_INCLUDE_CONFIGS_DOC = "Include the (worker, connector) configs in the log."; + public static final boolean DLQ_INCLUDE_CONFIGS_DEFAULT = false; + + public static final String DLQ_INCLUDE_MESSAGES = "dlq.include.messages"; + public static final String DLQ_INCLUDE_MESSAGES_DOC = "Include the Connect Record which failed to process in the log."; + public static final boolean DLQ_INCLUDE_MESSAGES_DEFAULT = false; + + public static final String DLQ_CONVERTER = "dlq.converter"; + public static final String DLQ_CONVERTER_DOC = "Include the Connect Record which failed to process in the log."; + public static final Class<? extends Converter> DLQ_CONVERTER_DEFAULT = StringConverter.class; + + public static final String DLQ_PRODUCER_PROPERTIES = "dlq.producer"; + + private DlqReporterConfig config; + private KafkaProducer<byte[], byte[]> producer; + private Converter converter; + + static ConfigDef getConfigDef() { + return new ConfigDef() + .define(DLQ_TOPIC_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, DLQ_TOPIC_NAME_DOC) + .define(DLQ_TOPIC_PARTITIONS, ConfigDef.Type.INT, DLQ_TOPIC_PARTITIONS_DEFAULT, atLeast(1), ConfigDef.Importance.HIGH, DLQ_TOPIC_PARTITIONS_DOC) + .define(DLQ_TOPIC_REPLICATION_FACTOR, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), ConfigDef.Importance.HIGH, DLQ_TOPIC_REPLICATION_FACTOR_DOC) + .define(DLQ_INCLUDE_CONFIGS, ConfigDef.Type.BOOLEAN, DLQ_INCLUDE_CONFIGS_DEFAULT, ConfigDef.Importance.HIGH, DLQ_INCLUDE_CONFIGS_DOC) + .define(DLQ_INCLUDE_MESSAGES, ConfigDef.Type.BOOLEAN, DLQ_INCLUDE_MESSAGES_DEFAULT, ConfigDef.Importance.HIGH, DLQ_INCLUDE_MESSAGES_DOC) + .define(DLQ_CONVERTER, ConfigDef.Type.CLASS, DLQ_CONVERTER_DEFAULT, ConfigDef.Importance.HIGH, DLQ_CONVERTER_DOC); + } + + @Override + public void configure(Map<String, ?> configs) { + config = new DlqReporterConfig(configs); + } + + @Override + public void initialize() { + // check if topic exists + NewTopic topicDescription = TopicAdmin.defineTopic(config.topic()) + .partitions(config.topicPartitions()) + .replicationFactor(config.topicReplicationFactor()) + .build(); + + Map<String, Object> adminProps = config.getProducerProps(); + try (TopicAdmin admin = new TopicAdmin(adminProps)) { + admin.createTopics(topicDescription); + } + + Map<String, Object> producerProps = config.getProducerProps(); + producer = new KafkaProducer<>(producerProps); + + try { + if (config.converter().isAssignableFrom(Converter.class)) { + converter = config.converter().newInstance(); + } + } catch (InstantiationException e) { + throw new ConnectException("Could not instantiate converter"); + } catch (IllegalAccessException e) { + throw new ConnectException("Could not access class"); + } + } + + @Override + public void report(final ProcessingContext report) { + byte[] val; + try { + val = converter.fromConnectData(config.topic(), SchemaBuilder.struct().schema(), report); + } catch (Exception e) { + log.error("Could not convert report for producing", e); + return; + } + + + + producer.send(new ProducerRecord<byte[], byte[]>(config.topic(), val), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + log.error("Could not write record to DLQ", report); + } + } + }); + } + + private static class DlqReporterConfig extends AbstractConfig { + public DlqReporterConfig(Map<?, ?> originals) { + super(getConfigDef(), originals); + } + + public String topic() { + return getString(DLQ_TOPIC_NAME); + } + + public int topicPartitions() { + return getInt(DLQ_TOPIC_NAME); + } + + public short topicReplicationFactor() { + return getShort(DLQ_TOPIC_REPLICATION_FACTOR); + } + + public boolean includeConfigs() { + return getBoolean(DLQ_INCLUDE_CONFIGS); + } + + public boolean includeMessages() { + return getBoolean(DLQ_INCLUDE_MESSAGES); + } + + @SuppressWarnings("unchecked") + public Class<? extends Converter> converter() { + return (Class<? extends Converter>) getClass(DLQ_CONVERTER); + } + + public Map<String, Object> getProducerProps() { + return originalsWithPrefix(DLQ_PRODUCER_PROPERTIES + "."); + } + + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ErrorMetricsReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ErrorMetricsReporter.java new file mode 100644 index 00000000000..d1d01277b5b --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ErrorMetricsReporter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors.impl; + +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; + +import java.util.Map; + +public class ErrorMetricsReporter extends ErrorReporter { + + @Override + public void configure(Map<String, ?> configs) { + } + + @Override + public void report(ProcessingContext report) { + + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/LogReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/LogReporter.java new file mode 100644 index 00000000000..c8aa09f5bbc --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/LogReporter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors.impl; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class LogReporter extends ErrorReporter { + + private static final Logger log = LoggerFactory.getLogger(LogReporter.class); + + public static final String LOG_INCLUDE_CONFIGS = "log.include.configs"; + public static final String LOG_INCLUDE_CONFIGS_DOC = "Include the (worker, connector) configs in the log."; + public static final boolean LOG_INCLUDE_CONFIGS_DEFAULT = false; + + public static final String LOG_INCLUDE_MESSAGES = "log.include.messages"; + public static final String LOG_INCLUDE_MESSAGES_DOC = "Include the Connect Record which failed to process in the log."; + public static final boolean LOG_INCLUDE_MESSAGES_DEFAULT = false; + + private LogReporterConfig config; + + static ConfigDef getConfigDef() { + return new ConfigDef() + .define(LOG_INCLUDE_CONFIGS, ConfigDef.Type.BOOLEAN, LOG_INCLUDE_CONFIGS_DEFAULT, ConfigDef.Importance.HIGH, LOG_INCLUDE_CONFIGS_DOC) + .define(LOG_INCLUDE_MESSAGES, ConfigDef.Type.BOOLEAN, LOG_INCLUDE_MESSAGES_DEFAULT, ConfigDef.Importance.HIGH, LOG_INCLUDE_MESSAGES_DOC); + } + + @Override + public void configure(Map<String, ?> configs) { + config = new LogReporterConfig(configs); + } + + @Override + public void report(ProcessingContext context) { + log.info("Error processing record. Context={}", createLogString(context)); + } + + public String createLogString(ProcessingContext context) { + return String.valueOf(context.toStruct()); + } + + static class LogReporterConfig extends AbstractConfig { + public LogReporterConfig(Map<?, ?> originals) { + super(getConfigDef(), originals); + } + + public boolean includeConfigs() { + return getBoolean(LOG_INCLUDE_CONFIGS); + } + + public boolean includeMessages() { + return getBoolean(LOG_INCLUDE_MESSAGES); + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/NoopExecutor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/NoopExecutor.java new file mode 100644 index 00000000000..4b7bf317669 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/NoopExecutor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors.impl; + +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; + +public class NoopExecutor extends OperationExecutor { + + public static final OperationExecutor INSTANCE = new NoopExecutor(); + + private NoopExecutor() { + } + + @Override + public <V> V execute(OperationExecutor.Operation<V> operation, V value, ProcessingContext context) { + try { + return operation.apply(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + context.setException(e); + throw new RuntimeException(e); + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ReporterFactory.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ReporterFactory.java new file mode 100644 index 00000000000..23de34fcd0d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/ReporterFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors.impl; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ReporterFactory { + + private static final Logger log = LoggerFactory.getLogger(ReporterFactory.class); + + public static final String DLQ_ENABLE = "dlq.enable"; + public static final String DLQ_ENABLE_DOC = "Log the error context along with the other application logs."; + public static final boolean DLQ_ENABLE_DEFAULT = false; + + public static final String LOG_ENABLE = "log.enable"; + public static final String LOG_ENABLE_DOC = "Log the error context along with the other application logs."; + public static final boolean LOG_ENABLE_DEFAULT = false; + + static ConfigDef getConfigDef() { + return new ConfigDef() + .define(DLQ_ENABLE, ConfigDef.Type.BOOLEAN, DLQ_ENABLE_DEFAULT, ConfigDef.Importance.HIGH, DLQ_ENABLE_DOC) + .define(LOG_ENABLE, ConfigDef.Type.BOOLEAN, LOG_ENABLE_DEFAULT, ConfigDef.Importance.HIGH, LOG_ENABLE_DOC); + } + + public List<ErrorReporter> forConfig(Map<String, Object> workerProducerProps, ConnectorConfig connConfig) { + Map<String, Object> configs = new HashMap<>(); + for (Map.Entry<String, Object> e: workerProducerProps.entrySet()) { + configs.put(DLQReporter.DLQ_PRODUCER_PROPERTIES + "." + e.getKey(), e.getValue()); + } + configs.putAll(connConfig.errorHandlerConfig()); + + ReporterFactoryConfig config = new ReporterFactoryConfig(getConfigDef(), configs); + List<ErrorReporter> reporters = new ArrayList<>(3); + log.info("Adding metrics reporter for reporting errors"); + reporters.add(new ErrorMetricsReporter()); + if (config.isDlqReporterEnabled()) { + log.info("Adding DLQ reporter for reporting errors"); + DLQReporter reporter = new DLQReporter(); + reporter.configure(configs); + reporter.initialize(); + reporters.add(reporter); + } + if (config.isLogReporterEnabled()) { + log.info("Adding Log reporter for reporting errors"); + LogReporter reporter = new LogReporter(); + reporter.configure(configs); + reporter.initialize(); + reporters.add(reporter); + } + return reporters; + } + + static class ReporterFactoryConfig extends AbstractConfig { + + public ReporterFactoryConfig(ConfigDef definition, Map<?, ?> originals) { + super(definition, originals, false); + } + + public boolean isLogReporterEnabled() { + return getBoolean(LOG_ENABLE); + } + + public boolean isDlqReporterEnabled() { + return getBoolean(DLQ_ENABLE); + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutor.java new file mode 100644 index 00000000000..d4a4b36b95a --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutor.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors.impl; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.ToleranceExceededException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.ValidString.in; + +public class RetryWithToleranceExecutor extends OperationExecutor { + + private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceExecutor.class); + + public static final String RETRIES_LIMIT = "retries.limit"; + public static final String RETRIES_LIMIT_DOC = "The maximum number of retries before failing an operation"; + public static final long RETRIES_LIMIT_DEFAULT = 0; + + public static final String RETRIES_DELAY_MAX_MS = "retries.delay.max.ms"; + public static final String RETRIES_DELAY_MAX_MS_DOC = "The maximum duration between two consecutive retries (in milliseconds)."; + public static final long RETRIES_DELAY_MAX_MS_DEFAULT = 60000; + + public static final long RETRIES_DELAY_MIN_MS = 1000; + + public static final String TOLERANCE_LIMIT = "tolerance.limit"; + public static final String TOLERANCE_LIMIT_DOC = "Fail the task if we exceed specified number of errors overall."; + public static final long TOLERANCE_LIMIT_DEFAULT = 0; + + public static final String TOLERANCE_RATE_LIMIT = "tolerance.rate.limit"; + public static final String TOLERANCE_RATE_LIMIT_DOC = "Fail the task if we exceed specified number of errors in the observed duration."; + public static final long TOLERANCE_RATE_LIMIT_DEFAULT = 0; + + public static final String TOLERANCE_RATE_DURATION = "tolerance.rate.duration"; + public static final String TOLERANCE_RATE_DURATION_DOC = "The duration of the window for which we will monitor errors."; + public static final String TOLERANCE_RATE_DURATION_DEFAULT = "minute"; + + private final Time time; + private RetryWithToleranceExecutorConfig config; + + private long totalFailures = 0; + private long totalFailuresInDuration = 0; + private long durationWindow = 0; + private long durationStart = 0; + + public RetryWithToleranceExecutor() { + this(new SystemTime()); + } + + public RetryWithToleranceExecutor(Time time) { + this.time = time; + } + + @Override + public void configure(Map<String, ?> configs) { + config = new RetryWithToleranceExecutorConfig(configs); + durationWindow = TimeUnit.MILLISECONDS.convert(1, config.toleranceRateDuration()); + durationStart = time.milliseconds() % durationWindow; + } + + static ConfigDef getConfigDef() { + return new ConfigDef() + .define(RETRIES_LIMIT, ConfigDef.Type.LONG, RETRIES_LIMIT_DEFAULT, ConfigDef.Importance.HIGH, RETRIES_LIMIT_DOC) + .define(RETRIES_DELAY_MAX_MS, ConfigDef.Type.LONG, RETRIES_DELAY_MAX_MS_DEFAULT, atLeast(1), ConfigDef.Importance.MEDIUM, RETRIES_DELAY_MAX_MS_DOC) + .define(TOLERANCE_LIMIT, ConfigDef.Type.LONG, TOLERANCE_LIMIT_DEFAULT, ConfigDef.Importance.HIGH, TOLERANCE_LIMIT_DOC) + .define(TOLERANCE_RATE_LIMIT, ConfigDef.Type.LONG, TOLERANCE_RATE_LIMIT_DEFAULT, ConfigDef.Importance.MEDIUM, TOLERANCE_RATE_LIMIT_DOC) + .define(TOLERANCE_RATE_DURATION, ConfigDef.Type.STRING, TOLERANCE_RATE_DURATION_DEFAULT, in("minute", "hour", "day"), ConfigDef.Importance.MEDIUM, TOLERANCE_RATE_DURATION_DOC); + } + + @Override + public <V> V execute(Operation<V> operation, V value, ProcessingContext context) { + try { + switch (context.current().type()) { + case TRANSFORMATION: + case HEADER_CONVERTER: + case KEY_CONVERTER: + case VALUE_CONVERTER: + return execAndHandleError(operation, value, context, Exception.class); + case KAFKA_PRODUCE: + case KAFKA_CONSUME: + return execAndHandleError(operation, value, context, org.apache.kafka.common.errors.RetriableException.class); + default: + return execAndHandleError(operation, value, context, org.apache.kafka.connect.errors.RetriableException.class); + } + } finally { + if (context.exception() != null) { + context.report(); + } + } + } + + private <V> V execAndHandleError(Operation<V> operation, V value, ProcessingContext context, Class<? extends Exception> handled) { + Response<V> response = new Response<>(); + boolean retry; + do { + apply(response, operation, handled); + context.incrementAttempt(); + switch (response.status) { + case SUCCESS: + return response.result; + case RETRY: + context.setException(response.ex); + retry = checkRetry(context); + log.trace("Operation failed. For attempt={}, limit={}, retry={}", context.attempt(), config.retriesLimit(), retry); + break; + case UNHANDLED_EXCEPTION: + context.setException(response.ex); + throw new ConnectException(response.ex); + default: + throw new ConnectException("Undefined state: " + response.status); + } + if (retry) { + backoff(context); + if (Thread.currentThread().isInterrupted()) { + // thread was interrupted during sleep. kill the task. + throw new ConnectException("Thread was interrupted"); + } + } + } while (retry); + + // mark this record as failed. + totalFailures++; + context.setTimeOfError(time.milliseconds()); + + if (!withinToleranceLimits(context)) { + throw new ToleranceExceededException("Tolerance Limit Exceeded", response.ex); + } + + log.trace("Operation failed but within tolerance limits. Returning default value={}", value); + return value; + } + + // Visible for testing + protected boolean withinToleranceLimits(ProcessingContext context) { + final long timeOfError = context.timeOfError(); + long newDurationStart = timeOfError - timeOfError % durationWindow; + if (newDurationStart > durationStart) { + durationStart = newDurationStart; + totalFailuresInDuration = 0; + } + totalFailuresInDuration++; + + log.info("Marking the record as failed, totalFailures={}, totalFailuresInDuration={}", totalFailures, totalFailuresInDuration); + + if (totalFailures > config.toleranceLimit() || totalFailuresInDuration > config.toleranceRateLimit()) { + return false; + } + + return true; + } + + // Visible for tests + protected boolean checkRetry(ProcessingContext context) { + long limit = config.retriesLimit(); + if (limit == -1) { + return true; + } else if (limit == 0) { + return false; + } else if (limit > 0) { + // number of retries is one less than the number of attempts. + return (context.attempt() - 1) < config.retriesLimit(); + } else { + log.error("Unexpected value for retry limit={}. Will disable retry.", limit); + return false; + } + } + + protected void backoff(ProcessingContext context) { + int numRetry = context.attempt() - 1; + long delay = RETRIES_DELAY_MIN_MS << numRetry; + if (delay > config.retriesDelayMax()) { + delay = ThreadLocalRandom.current().nextLong(config.retriesDelayMax()); + } + + log.debug("Sleeping for {} millis", delay); + time.sleep(delay); + } + + private <V> void apply(Response<V> response, Operation<V> operation, Class<? extends Exception> handled) { + try { + response.result = operation.apply(); + response.ex = null; + response.status = ResponseStatus.SUCCESS; + } catch (Exception e) { + response.ex = e; + response.result = null; + if (handled.isAssignableFrom(e.getClass())) { + response.status = ResponseStatus.RETRY; + } else { + response.status = ResponseStatus.UNHANDLED_EXCEPTION; + } + } + } + + static class Response<V> { + ResponseStatus status; + Exception ex; + V result; + } + + enum ResponseStatus { + SUCCESS, + RETRY, + UNHANDLED_EXCEPTION + } + + static class RetryWithToleranceExecutorConfig extends AbstractConfig { + + public RetryWithToleranceExecutorConfig(Map<?, ?> originals) { + super(getConfigDef(), originals); + } + + public long retriesLimit() { + return getLong(RETRIES_LIMIT); + } + + public long retriesDelayMax() { + return getLong(RETRIES_DELAY_MAX_MS); + } + + public long toleranceLimit() { + return getLong(TOLERANCE_LIMIT); + } + + public long toleranceRateLimit() { + return getLong(TOLERANCE_RATE_LIMIT); + } + + public TimeUnit toleranceRateDuration() { + final String duration = getString(TOLERANCE_RATE_DURATION); + switch (duration.toLowerCase(Locale.ROOT)) { + case "minute": + return TimeUnit.MINUTES; + case "hour": + return TimeUnit.HOURS; + case "day": + return TimeUnit.DAYS; + default: + throw new ConfigException("Could not recognize value " + getString(TOLERANCE_LIMIT) + " for config " + TOLERANCE_RATE_DURATION); + } + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/StructUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/StructUtil.java new file mode 100644 index 00000000000..a4081d2e2c4 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/impl/StructUtil.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors.impl; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.runtime.errors.StageType; +import org.apache.kafka.connect.runtime.errors.Structable; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class StructUtil { + + private static final Logger log = LoggerFactory.getLogger(StructUtil.class); + + public static Struct toStruct(Structable structable) { + Objects.requireNonNull(structable); + + Schema objectSchema = getSchemaFor(structable.getClass()); + return getStructFor(structable, objectSchema); + } + + @SuppressWarnings("unchecked") + private static Struct getStructFor(Structable structable, Schema schema) { + Objects.requireNonNull(structable); + + Struct struct = new Struct(schema); + Method[] methods = structable.getClass().getMethods(); + for (Method method: methods) { + Structable.Field field = method.getAnnotation(Structable.Field.class); + if (field == null) { + continue; + } + if (method.getParameterTypes().length > 0) { + log.debug("Cannot invoke method with non-zero parameters."); + continue; + } + try { + Object object = method.invoke(structable); + if (object != null) { + if (object instanceof Exception) { + Exception ex = (Exception) object; + object = getMessage(ex) + "\n" + getStackTrace(ex); + } else if (object.getClass().isEnum()) { + object = String.valueOf(object); + } else if (Class.class.isAssignableFrom(object.getClass())) { + object = ((Class<?>) object).getName(); + } else if (object instanceof Map) { + Map<String, String> m = new HashMap<>(); + Map<Object, Object> mm = (Map<Object, Object>) object; + for (Map.Entry<Object, Object> e: mm.entrySet()) { + String key = e.getKey() instanceof String ? (String) e.getKey() : String.valueOf(e.getKey()); + String val = e.getValue() instanceof String ? (String) e.getValue() : String.valueOf(e.getValue()); + m.put(key, val); + } + object = m; + } + struct.put(field.value(), object); + } + + } catch (Exception e) { + log.error("Could nto invoke method " + method, e); + } + } + return struct; + } + + @SuppressWarnings("unchecked") + public static Schema getSchemaFor(Class<? extends Structable> structable) { + SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + Method[] methods = structable.getMethods(); + for (Method method: methods) { + Structable.Field field = method.getAnnotation(Structable.Field.class); + if (field == null) { + continue; + } + if (method.getParameterTypes().length > 0) { + log.debug("Cannot invoke method with non-zero parameters."); + continue; + } + + Class<?> type = method.getReturnType(); + if (method.getReturnType().equals(int.class)) { + schemaBuilder.field(field.value(), Schema.OPTIONAL_INT32_SCHEMA); + } else if (method.getReturnType().equals(String.class)) { + schemaBuilder.field(field.value(), Schema.OPTIONAL_STRING_SCHEMA); + } else if (method.getReturnType().equals(long.class)) { + schemaBuilder.field(field.value(), Schema.OPTIONAL_INT64_SCHEMA); + } else if (method.getReturnType().equals(Exception.class)) { + schemaBuilder.field(field.value(), Schema.OPTIONAL_STRING_SCHEMA); + } else if (method.getReturnType().equals(Structable.class)) { + schemaBuilder.field(field.value(), getSchemaFor((Class<? extends Structable>) method.getReturnType())); + } else if (method.getReturnType().isEnum()) { + schemaBuilder.field(field.value(), Schema.OPTIONAL_STRING_SCHEMA); + } else if (Class.class.isAssignableFrom(method.getReturnType())) { + schemaBuilder.field(field.value(), Schema.OPTIONAL_STRING_SCHEMA); + } else if (Map.class.isAssignableFrom(method.getReturnType())) { + schemaBuilder.field(field.value(), SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA).build()); + } else { + throw new DataException("Could not translate type: " + type + " for method " + method); + } + } + return schemaBuilder.build(); + } + + public static String getMessage(final Throwable th) { + if (th == null) { + return ""; + } + final String clsName = th.getClass().getName(); + final String msg = th.getMessage(); + return clsName + ": " + msg; + } + + public static String getStackTrace(final Throwable throwable) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw, true); + throwable.printStackTrace(pw); + return sw.getBuffer().toString(); + } + + public static void main(String[] args) { + new StructUtil().test(); + } + + private Exception foo() { + return new ConnectException("connect"); + } + + private void test() { + System.out.println(RuntimeException.class.isAssignableFrom(Exception.class)); + System.out.println(Exception.class.isAssignableFrom(RuntimeException.class)); + System.out.println(RuntimeException.class.isAssignableFrom(RuntimeException.class)); + + ProcessingContext context = ProcessingContext.noop("my-task"); + context.setException(new RuntimeException("hello", foo())); + Struct contextStruct = toStruct(context); + + Map<String, Object> config = new HashMap<>(); + config.put("k1", "v1"); + config.put("k2", 10); + Stage stage = Stage.newBuilder(StageType.TASK_START).setExecutingClass(SinkTask.class).setConfig(config).build(); + Struct stageStruct = toStruct(stage); + + Schema combinedSchema = SchemaBuilder.struct() + .field("context", contextStruct.schema()) + .field("stage", stageStruct.schema()) + .build(); + Struct struct = new Struct(combinedSchema); + struct.put("context", contextStruct); + struct.put("stage", stageStruct); + + System.out.println(struct.toString().replaceAll("\n", "\\\\n")); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 9568e787cc5..791637c06c9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -31,6 +31,8 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup; @@ -161,7 +163,8 @@ public void setUp() { private void createTask(TargetState initialState) { workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, valueConverter, headerConverter, transformationChain, pluginLoader, time); + taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, valueConverter, headerConverter, transformationChain, pluginLoader, time, + ProcessingContext.noop(taskId.toString())); } @After @@ -1361,7 +1364,7 @@ private void expectConversionAndTransformation(final int numMessages, final Stri EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages); final Capture<SinkRecord> recordCapture = EasyMock.newCapture(); - EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))) + EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture), EasyMock.<OperationExecutor>anyObject(), EasyMock.<ProcessingContext>anyObject())) .andAnswer(new IAnswer<SinkRecord>() { @Override public SinkRecord answer() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index b09f8477f45..f5e3cfd2abc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -28,6 +28,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; @@ -137,7 +139,8 @@ public void setup() { workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, - valueConverter, headerConverter, TransformationChain.noOp(), pluginLoader, time); + valueConverter, headerConverter, TransformationChain.<SinkRecord>noOp(), pluginLoader, time, + ProcessingContext.noop(taskId.toString())); recordsReturned = 0; } @@ -573,7 +576,7 @@ private void expectStopTask() throws Exception { EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes(); final Capture<SinkRecord> recordCapture = EasyMock.newCapture(); - EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(new IAnswer<SinkRecord>() { + EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture), EasyMock.<OperationExecutor>anyObject(), EasyMock.<ProcessingContext>anyObject())).andAnswer(new IAnswer<SinkRecord>() { @Override public SinkRecord answer() { return recordCapture.getValue(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 25c2cb10352..709c59a5102 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -26,6 +26,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; @@ -146,7 +148,8 @@ private void createWorkerTask() { private void createWorkerTask(TargetState initialState) { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, - transformationChain, producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM); + transformationChain, producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM, + ProcessingContext.noop(taskId.toString())); } @Test @@ -772,7 +775,7 @@ private void expectConvertKeyValue(boolean anyTimes) { private void expectApplyTransformationChain(boolean anyTimes) { final Capture<SourceRecord> recordCapture = EasyMock.newCapture(); - IExpectationSetters<SourceRecord> convertKeyExpect = EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))); + IExpectationSetters<SourceRecord> convertKeyExpect = EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture), EasyMock.<OperationExecutor>anyObject(), EasyMock.<ProcessingContext>anyObject())); if (anyTimes) convertKeyExpect.andStubAnswer(new IAnswer<SourceRecord>() { @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index 78f2836f42b..dc3c8b0acc7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.common.utils.MockTime; @@ -77,9 +78,10 @@ public void standardStartup() { TaskStatus.Listener.class, TargetState.class, ClassLoader.class, - ConnectMetrics.class + ConnectMetrics.class, + ProcessingContext.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, ProcessingContext.noop("test")) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") @@ -123,9 +125,10 @@ public void stopBeforeStarting() { TaskStatus.Listener.class, TargetState.class, ClassLoader.class, - ConnectMetrics.class + ConnectMetrics.class, + ProcessingContext.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, ProcessingContext.noop("test")) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") @@ -162,9 +165,10 @@ public void cancelBeforeStopping() throws Exception { TaskStatus.Listener.class, TargetState.class, ClassLoader.class, - ConnectMetrics.class + ConnectMetrics.class, + ProcessingContext.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, ProcessingContext.noop("test")) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index f062436f0e0..b4828dcc998 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -33,6 +33,9 @@ import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.impl.NoopExecutor; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -475,6 +478,8 @@ public void testAddRemoveTask() throws Exception { expectStartStorage(); EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); + EasyMock.expect(workerTask.processingContext()).andStubReturn(ProcessingContext.noop(TASK_ID.toString())); + EasyMock.expect(workerTask.operationExecutor()).andStubReturn(NoopExecutor.INSTANCE); EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); PowerMock.expectNew( @@ -492,7 +497,9 @@ public void testAddRemoveTask() throws Exception { EasyMock.eq(config), anyObject(ConnectMetrics.class), anyObject(ClassLoader.class), - anyObject(Time.class)) + anyObject(Time.class), + anyObject(ProcessingContext.class), + anyObject(OperationExecutor.class)) .andReturn(workerTask); Map<String, String> origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -612,6 +619,8 @@ public void testCleanupTasksOnStop() throws Exception { expectStartStorage(); EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); + EasyMock.expect(workerTask.processingContext()).andStubReturn(ProcessingContext.noop(TASK_ID.toString())); + EasyMock.expect(workerTask.operationExecutor()).andStubReturn(NoopExecutor.INSTANCE); EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); PowerMock.expectNew( @@ -629,7 +638,9 @@ public void testCleanupTasksOnStop() throws Exception { anyObject(WorkerConfig.class), anyObject(ConnectMetrics.class), EasyMock.eq(pluginLoader), - anyObject(Time.class)) + anyObject(Time.class), + anyObject(ProcessingContext.class), + anyObject(OperationExecutor.class)) .andReturn(workerTask); Map<String, String> origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -699,6 +710,8 @@ public void testConverterOverrides() throws Exception { expectStartStorage(); EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); + EasyMock.expect(workerTask.processingContext()).andStubReturn(ProcessingContext.noop(TASK_ID.toString())); + EasyMock.expect(workerTask.operationExecutor()).andStubReturn(NoopExecutor.INSTANCE); Capture<TestConverter> keyConverter = EasyMock.newCapture(); Capture<TestConfigurableConverter> valueConverter = EasyMock.newCapture(); @@ -720,7 +733,9 @@ public void testConverterOverrides() throws Exception { anyObject(WorkerConfig.class), anyObject(ConnectMetrics.class), EasyMock.eq(pluginLoader), - anyObject(Time.class)) + anyObject(Time.class), + anyObject(ProcessingContext.class), + anyObject(OperationExecutor.class)) .andReturn(workerTask); Map<String, String> origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutorTest.java new file mode 100644 index 00000000000..44249695b2e --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/impl/RetryWithToleranceExecutorTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors.impl; + +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.runtime.errors.OperationExecutor; +import org.apache.kafka.connect.runtime.errors.ProcessingContext; +import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.runtime.errors.StageType; +import org.easymock.EasyMock; +import org.easymock.Mock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ProcessingContext.class}) +public class RetryWithToleranceExecutorTest { + + @Mock + private ProcessingContext processingContext; + + Object ref = new Object(); + + @Test + public void testHandleExceptionInTransformations() { + testHandleExceptionInStage(StageType.TRANSFORMATION, new Exception()); + } + + @Test + public void testHandleExceptionInHeaderConverter() { + testHandleExceptionInStage(StageType.HEADER_CONVERTER, new Exception()); + } + + @Test + public void testHandleExceptionInValueConverter() { + testHandleExceptionInStage(StageType.VALUE_CONVERTER, new Exception()); + } + + @Test + public void testHandleExceptionInKeyConverter() { + testHandleExceptionInStage(StageType.KEY_CONVERTER, new Exception()); + } + + @Test + public void testHandleExceptionInKafkaConsume() { + testHandleExceptionInStage(StageType.KAFKA_CONSUME, new org.apache.kafka.common.errors.RetriableException() {}); + } + + @Test + public void testHandleExceptionInKafkaProduce() { + testHandleExceptionInStage(StageType.KAFKA_PRODUCE, new org.apache.kafka.common.errors.RetriableException() {}); + } + + @Test + public void testHandleExceptionInTaskPut() { + testHandleExceptionInStage(StageType.TASK_PUT, new org.apache.kafka.connect.errors.RetriableException("Test")); + } + + @Test + public void testHandleExceptionInTaskPoll() { + testHandleExceptionInStage(StageType.TASK_POLL, new org.apache.kafka.connect.errors.RetriableException("Test")); + } + + @Test(expected = Exception.class) + public void testThrowExceptionInTaskPut() { + testHandleExceptionInStage(StageType.TASK_PUT, new Exception()); + } + + @Test(expected = Exception.class) + public void testThrowExceptionInTaskPoll() { + testHandleExceptionInStage(StageType.TASK_POLL, new Exception()); + } + + @Test(expected = Exception.class) + public void testThrowExceptionInKafkaConsume() { + testHandleExceptionInStage(StageType.KAFKA_CONSUME, new Exception()); + } + + @Test(expected = Exception.class) + public void testThrowExceptionInKafkaProduce() { + testHandleExceptionInStage(StageType.KAFKA_PRODUCE, new Exception()); + } + + private void testHandleExceptionInStage(StageType type, Exception ex) { + RetryWithToleranceExecutor executor = setupExecutor(); + setupProcessingContext(type, ex); + replay(processingContext); + assertEquals(ref, executor.execute(new ExceptionThrower(ex), ref, processingContext)); + PowerMock.verifyAll(); + } + + private RetryWithToleranceExecutor setupExecutor() { + RetryWithToleranceExecutor executor = new RetryWithToleranceExecutor(); + Map<String, Object> props = config(RetryWithToleranceExecutor.RETRIES_LIMIT, "0"); + props.put(RetryWithToleranceExecutor.TOLERANCE_LIMIT, "10"); + props.put(RetryWithToleranceExecutor.TOLERANCE_RATE_LIMIT, "10"); + executor.configure(props); + return executor; + } + + private void setupProcessingContext(StageType type, Exception ex) { + EasyMock.expect(processingContext.current()).andReturn(Stage.newBuilder(type).build()); + EasyMock.expect(processingContext.exception()).andReturn(ex); + EasyMock.expect(processingContext.attempt()).andReturn(1); + EasyMock.expect(processingContext.timeOfError()).andReturn(System.currentTimeMillis()); + processingContext.setTimeOfError(EasyMock.anyLong()); + + processingContext.incrementAttempt(); + EasyMock.expectLastCall(); + + processingContext.report(); + EasyMock.expectLastCall(); + + processingContext.setException(ex); + EasyMock.expectLastCall(); + } + + @Test + public void testCheckRetryLimit() { + RetryWithToleranceExecutor executor = new RetryWithToleranceExecutor(); + Map<String, Object> props = config(RetryWithToleranceExecutor.RETRIES_LIMIT, "5"); + props.put(RetryWithToleranceExecutor.RETRIES_DELAY_MAX_MS, "120000"); + executor.configure(props); + + EasyMock.expect(processingContext.attempt()).andReturn(1); + EasyMock.expect(processingContext.attempt()).andReturn(2); + EasyMock.expect(processingContext.attempt()).andReturn(3); + EasyMock.expect(processingContext.attempt()).andReturn(4); + EasyMock.expect(processingContext.attempt()).andReturn(5); + EasyMock.expect(processingContext.attempt()).andReturn(6); + + replay(processingContext); + + assertTrue(executor.checkRetry(processingContext)); + assertTrue(executor.checkRetry(processingContext)); + assertTrue(executor.checkRetry(processingContext)); + assertTrue(executor.checkRetry(processingContext)); + assertTrue(executor.checkRetry(processingContext)); + assertFalse(executor.checkRetry(processingContext)); + } + + @Test + public void testBackoffLimit() { + MockTime time = new MockTime(0, 0, 0); + RetryWithToleranceExecutor executor = new RetryWithToleranceExecutor(time); + + Map<String, Object> props = config(RetryWithToleranceExecutor.RETRIES_LIMIT, "5"); + props.put(RetryWithToleranceExecutor.RETRIES_DELAY_MAX_MS, "10000"); + executor.configure(props); + + EasyMock.expect(processingContext.attempt()).andReturn(1); + EasyMock.expect(processingContext.attempt()).andReturn(2); + EasyMock.expect(processingContext.attempt()).andReturn(3); + EasyMock.expect(processingContext.attempt()).andReturn(4); + EasyMock.expect(processingContext.attempt()).andReturn(5); + + replay(processingContext); + + long prevTs = time.hiResClockMs(); + executor.backoff(processingContext); + assertEquals(time.hiResClockMs() - prevTs, 1000); + + prevTs = time.hiResClockMs(); + executor.backoff(processingContext); + assertEquals(time.hiResClockMs() - prevTs, 2000); + + prevTs = time.hiResClockMs(); + executor.backoff(processingContext); + assertEquals(time.hiResClockMs() - prevTs, 4000); + + prevTs = time.hiResClockMs(); + executor.backoff(processingContext); + assertEquals(time.hiResClockMs() - prevTs, 8000); + + prevTs = time.hiResClockMs(); + executor.backoff(processingContext); + assertTrue(time.hiResClockMs() - prevTs < 10000); + + PowerMock.verifyAll(); + } + + @Test + public void testToleranceLimitWithinDuration() { + RetryWithToleranceExecutor executor = new RetryWithToleranceExecutor(new MockTime(0, 0, 0)); + Map<String, Object> props = config(RetryWithToleranceExecutor.TOLERANCE_RATE_LIMIT, "1"); + props.put(RetryWithToleranceExecutor.TOLERANCE_RATE_DURATION, "minute"); + executor.configure(props); + + long errorTimeMs = 1; + EasyMock.expect(processingContext.timeOfError()).andReturn(errorTimeMs); + EasyMock.expect(processingContext.timeOfError()).andReturn(errorTimeMs + 1); + + replay(processingContext); + + assertTrue(executor.withinToleranceLimits(processingContext)); + assertFalse(executor.withinToleranceLimits(processingContext)); + + PowerMock.verifyAll(); + } + + @Test + public void testToleranceLimitAcrossDuration() { + MockTime time = new MockTime(0, 0, 0); + RetryWithToleranceExecutor executor = new RetryWithToleranceExecutor(time); + Map<String, Object> props = config(RetryWithToleranceExecutor.TOLERANCE_RATE_LIMIT, "1"); + props.put(RetryWithToleranceExecutor.TOLERANCE_RATE_DURATION, "minute"); + executor.configure(props); + + // first error occurs at 59 seconds + long errorTimeMs = 59000; + EasyMock.expect(processingContext.timeOfError()).andReturn(errorTimeMs); + // second error occurs at 61 seconds + errorTimeMs = 61000; + EasyMock.expect(processingContext.timeOfError()).andReturn(errorTimeMs); + + replay(processingContext); + + assertTrue(executor.withinToleranceLimits(processingContext)); + + time.setCurrentTimeMs(60000); + + assertTrue(executor.withinToleranceLimits(processingContext)); + + PowerMock.verifyAll(); + } + + @Test + public void testDefaultConfigs() { + RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig configuration; + configuration = new RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(new HashMap<>()); + assertEquals(configuration.retriesLimit(), 0); + assertEquals(configuration.retriesDelayMax(), 60000); + assertEquals(configuration.toleranceLimit(), 0); + assertEquals(configuration.toleranceRateLimit(), 0); + assertEquals(configuration.toleranceRateDuration(), TimeUnit.MINUTES); + + PowerMock.verifyAll(); + } + + @Test + public void testConfigs() { + RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig configuration; + configuration = new RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("retries.limit", "100")); + assertEquals(configuration.retriesLimit(), 100); + + configuration = new RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("retries.delay.max.ms", "100")); + assertEquals(configuration.retriesDelayMax(), 100); + + configuration = new RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.limit", "1024")); + assertEquals(configuration.toleranceLimit(), 1024); + + configuration = new RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.rate.limit", "125")); + assertEquals(configuration.toleranceRateLimit(), 125); + + configuration = new RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.rate.duration", "minute")); + assertEquals(configuration.toleranceRateDuration(), TimeUnit.MINUTES); + + configuration = new RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.rate.duration", "day")); + assertEquals(configuration.toleranceRateDuration(), TimeUnit.DAYS); + + configuration = new RetryWithToleranceExecutor.RetryWithToleranceExecutorConfig(config("tolerance.rate.duration", "hour")); + assertEquals(configuration.toleranceRateDuration(), TimeUnit.HOURS); + + PowerMock.verifyAll(); + } + + Map<String, Object> config(String key, Object val) { + Map<String, Object> configs = new HashMap<>(); + configs.put(key, val); + return configs; + } + + private static class ExceptionThrower implements OperationExecutor.Operation { + private Exception e; + + public ExceptionThrower(Exception e) { + this.e = e; + } + + @Override + public Object apply() throws Exception { + throw e; + } + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect handling of bad data > ---------------------------------- > > Key: KAFKA-6738 > URL: https://issues.apache.org/jira/browse/KAFKA-6738 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 1.1.0 > Reporter: Randall Hauch > Assignee: Arjun Satish > Priority: Critical > Fix For: 2.0.0 > > > Kafka Connect connectors and tasks fail when they run into an unexpected > situation or error, but the framework should provide more general "bad data > handling" options, including (perhaps among others): > # fail fast, which is what we do today (assuming connector actually fails and > doesn't eat errors) > # retry (possibly with configs to limit) > # drop data and move on > # dead letter queue > This needs to be addressed in a way that handles errors from: > # The connector itself (e.g. connectivity issues to the other system) > # Converters/serializers (bad data, unexpected format, etc) > # SMTs > # Ideally the framework as well, though we obviously want to fix known bugs > anyway -- This message was sent by Atlassian JIRA (v7.6.3#76005)