TheKnowles commented on a change in pull request #11382: URL: https://github.com/apache/kafka/pull/11382#discussion_r753523689
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -366,7 +367,11 @@ private boolean sendRecords() { if (e != null) { log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); Review comment: Now that this could be a tolerated error, it makes sense to have it respect the errors.log.enable configuration, but the log line would be duplicated, unconditionally writing it in the event we do not tolerate and a config check if we do. Are you envisioning something like this? ``` if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) { if (errorLogEnabled) { // get this value from the config in some manner log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); } commitTaskRecord(preTransformRecord, null); } else { log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); producerSendException.compareAndSet(null, e); } ``` I would need to look more closely at the other layers of objects on top of the SourceTask. enableErrorLog() is available in the ConnectorConfig, but only the SinkConnectorConfig makes use of it. I would need to spin up some additional infrastructure. Not sure if I would want to add WorkerErrantRecordReporter to WorkerSourceTask or have the configuration pass down in some other manner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org