gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449403859
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -303,48 +303,14 @@ public String toString() { * @param reporters the error reporters (should not be null). */ public synchronized void reporters(List<ErrorReporter> reporters) { - this.context.reporters(reporters); - } - - /** - * Set the source record being processed in the connect pipeline. - * - * @param preTransformRecord the source record - */ - public synchronized void sourceRecord(SourceRecord preTransformRecord) { - this.context.sourceRecord(preTransformRecord); - } - - /** - * Set the record consumed from Kafka in a sink connector. - * - * @param consumedMessage the record - */ - public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) { - this.context.consumerRecord(consumedMessage); - } - - /** - * @return true, if the last operation encountered an error; false otherwise - */ - public synchronized boolean failed() { - return this.context.failed(); - } - - /** - * Returns the error encountered when processing the current stage. - * - * @return the error encountered when processing the current stage - */ - public synchronized Throwable error() { - return this.context.error(); + this.reporters = Objects.requireNonNull(reporters, "reporters"); Review Comment: While I like the CachedSupplier(Supplier<T> supplier) signature, I think it is probably not a good idea to use it with AutoCloseables, as the ownership of the object is ambiguous from the signature. Currently I know that Supplier<List<ErrorReporter>> puts the responsibility on the caller of get() to close the returned error reporters, but if CachedSupplier exists, some call-sites of get() will close the error reporters, and some won't. I guess the SharedTopicAdmin is the most similar thing I can think of. When it's a SharedTopicAdmin object, it needs to be closed. When it is cast to a Supplier<AdminClient>, it doesn't need to be closed. Perhaps we could do something similar here. Perhaps the following (a little lighter weight than SharedTopicAdmin)? ``` public class AutoClosableSupplier<T extends AutoCloseable> implements AutoCloseable, Supplier<T> { public AutoCloseableSupplier(Supplier<T> s, String closeMessage) { ... } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org