rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r429975231



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to 
the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link 
java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future 
will block until the
+   * record has been written or throw any exception that occurred while 
sending the record.
+   * If you want to simulate a simple blocking call you can call the 
<code>get()</code> method
+   * immediately.
+   *
+   * @param record the problematic record; may not be null
+   * @param error  the error capturing the problem with the record; may not be 
null
+   * @return a future that can be used to block until the record and error are 
reported
+   *         to the DLQ

Review comment:
       Also, what exceptions can this method throw? Should we add something 
like:
   
   ```suggestion
      *         to the DLQ
      * @throws ConnectException if the error reporter and DLQ fail to write a 
      *         reported record and are configured with {@code 
error.tolerance=NONE}
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {
+
+
+  /**
+   * Report a problematic record and the corresponding error to be written to 
the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link 
java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future 
will block until the
+   * record has been written or throw any exception that occurred while 
sending the record.
+   * If you want to simulate a simple blocking call you can call the 
<code>get()</code> method
+   * immediately.
+   *

Review comment:
       ```suggestion
      * 
      * Connect guarantees that sink records reported through this reporter 
will be written to the error topic
      * before the framework calls the {@link SinkTask#preCommit(Map)} method 
and therefore before
      * committing the consumer offsets. SinkTask implementations can use the 
Future when stronger guarantees
      * are required.
      * 
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+public interface ErrantRecordReporter {

Review comment:
       Please add JavaDoc for the interface, with `@since 2.6.0`.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(

Review comment:
       Nit: static methods should appear before the non-static fields.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = 
DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), 
error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? 
record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), 
record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), 
record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new 
ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    // Visible for testing
+    public class ErrantRecordFuture implements Future<Void> {
+
+        Future<RecordMetadata> future;

Review comment:
       Couldn't this be final?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -680,6 +689,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId 
id) {
                                                                 
connectorClientConfigOverridePolicy);
             Map<String, Object> adminProps = adminConfigs(id, config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy);
             DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, 
producerProps, errorHandlingMetrics);
+

Review comment:
       Nit: let's avoid adding new lines in code otherwise unaffected in the PR.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -531,13 +531,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter =
+                createWorkerErrantRecordReporter(
+                    id,
+                    sinkConfig,
+                    connectorClass,
+                    keyConverter,
+                    valueConverter,
+                    headerConverter
+                );

Review comment:
       Nit formatting:
   ```suggestion
               WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(
                       id, sinkConfig, connectorClass, keyConverter, 
valueConverter, headerConverter);
   ```
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -695,6 +705,32 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId 
id) {
         return reporters;
     }
 
+    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
+        ConnectorTaskId id,
+        SinkConnectorConfig connConfig,
+        Class<? extends Connector> connectorClass,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        // check if errant record reporter topic is configured
+        String topic = connConfig.dlqTopicName();
+        if ((topic != null && !topic.isEmpty()) || 
connConfig.enableErrorLog()) {
+            Map<String, Object> producerProps = producerConfigs(id, 
"connector-dlq-producer-" + id, config, connConfig, connectorClass,
+                connectorClientConfigOverridePolicy);
+            Map<String, Object> adminProps = adminConfigs(id, config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            return WorkerErrantRecordReporter.createAndSetup(
+                adminProps,
+                producerProps,
+                connConfig,
+                keyConverter,
+                valueConverter,
+                headerConverter
+            );

Review comment:
       Nit formatting:
   ```suggestion
               return WorkerErrantRecordReporter.createAndSetup(adminProps, 
producerProps,
                   connConfig, keyConverter, valueConverter, headerConverter);
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;

Review comment:
       All fields that can be `final` should be marked as such. This provides 
semantic intent to future developers and helps prevent unintentionally changing 
the fields in the future.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -531,13 +531,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter =
+                createWorkerErrantRecordReporter(
+                    id,
+                    sinkConfig,
+                    connectorClass,
+                    keyConverter,
+                    valueConverter,
+                    headerConverter
+                );

Review comment:
       At a higher level, why are we not reusing the 
`RetryWithToleranceOperator` here? I thought that was kind of the intent of the 
KIP, that this new `report(...)` method is just more way to capture problematic 
records using the existing DLQ functionality. I understand that might require 
other refactoring of that class (like returning a Future from the produce-like 
methods), but it seems like it would simplify things substantially by avoiding 
having to create our own producer and reuse a lot more of the functionality, 
such as metrics, retry count, logging, using the same producers, etc.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = 
DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), 
error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? 
record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), 
record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), 
record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new 
ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);

Review comment:
       Do we really want to pass the ExecutionException to the 
ConnectException, or would it be better to pass that exception's *cause* to the 
ConnectException?
   
   How about log messages here?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = 
DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), 
error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? 
record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), 
record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), 
record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);

Review comment:
       I suggested earlier about reusing the `RetryWithToleranceOperator`, and 
that doing so might require adding a `produce`-like method to that class that 
simply reports a new error. If that method took a `Callback` here and passed it 
to its `producer.send(...)` call, then we could provide a callback that removed 
the (completed) future from our list, helping to keep that list as small as 
possible with only the incomplete futures.
   
   If we did that, we'd want to use a `LinkedList` rather than an `ArrayList`, 
since we're no longer removing futures only from the ends of the list.
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private KafkaProducer<byte[], byte[]> producer;
+    private String dlqTopic;
+    private boolean useDlq;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private List<ErrantRecordFuture> errantRecordFutures;
+    private SinkConnectorConfig sinkConfig;
+    private HeaderConverter headerConverter;
+
+
+    public static WorkerErrantRecordReporter createAndSetup(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+
+        KafkaProducer<byte[], byte[]> kafkaProducer = 
DeadLetterQueueReporter.setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConnectorConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );
+
+        return new WorkerErrantRecordReporter(
+            kafkaProducer,
+            sinkConnectorConfig,
+            workerKeyConverter,
+            workerValueConverter,
+            workerHeaderConverter
+        );
+    }
+
+    // Visible for testing purposes
+    public WorkerErrantRecordReporter(
+        KafkaProducer<byte[], byte[]> kafkaProducer,
+        SinkConnectorConfig sinkConnectorConfig,
+        Converter workerKeyConverter,
+        Converter workerValueConverter,
+        HeaderConverter workerHeaderConverter
+    ) {
+        producer = kafkaProducer;
+        dlqTopic = sinkConnectorConfig.dlqTopicName();
+        useDlq = dlqTopic != null && !dlqTopic.isEmpty();
+        keyConverter = workerKeyConverter;
+        valueConverter = workerValueConverter;
+        errantRecordFutures = new ArrayList<>();
+        sinkConfig = sinkConnectorConfig;
+        headerConverter = workerHeaderConverter;
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+
+        if (sinkConfig.enableErrorLog()) {
+            if (sinkConfig.includeRecordDetailsInErrorLog()) {
+                log.error("Error processing record: " + record.toString(), 
error);
+            } else {
+                log.error(
+                    "Error processing record in topic "
+                        + record.topic()
+                        + "at offset "
+                        + record.kafkaOffset(),
+                    error
+                );
+            }
+        }
+
+        Future<RecordMetadata> producerFuture = null;
+
+        if (useDlq) {
+
+            Headers headers = record.headers();
+            RecordHeaders result = new RecordHeaders();
+            if (headers != null) {
+                String topic = record.topic();
+                for (Header header : headers) {
+                    String key = header.key();
+                    byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                    result.add(key, rawHeader);
+                }
+            }
+
+            ProducerRecord<byte[], byte[]> errantRecord = new ProducerRecord<>(
+                dlqTopic,
+                null,
+                record.timestamp() == RecordBatch.NO_TIMESTAMP ? 
record.timestamp() : null,
+                keyConverter.fromConnectData(dlqTopic, record.keySchema(), 
record.key()),
+                valueConverter.fromConnectData(dlqTopic, record.valueSchema(), 
record.value()),
+                result
+            );
+
+            producerFuture = producer.send(errantRecord);
+        }
+
+        ErrantRecordFuture errantRecordFuture = new 
ErrantRecordFuture(producerFuture);
+        errantRecordFutures.add(errantRecordFuture);
+        return errantRecordFuture;
+    }
+
+    public void waitForAllFutures() {
+        for (ErrantRecordFuture future : errantRecordFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new ConnectException(e);
+            }
+        }
+    }
+
+    // Visible for testing
+    public class ErrantRecordFuture implements Future<Void> {
+
+        Future<RecordMetadata> future;
+
+        public ErrantRecordFuture(Future<RecordMetadata> producerFuture) {
+            future = producerFuture;

Review comment:
       If we ensure that the producer future is never null, then we can remove 
the `if (future == null)` kind of checks in this class' methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to