C0urante commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1450850940
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java: ########## @@ -44,6 +49,12 @@ public ErrantRecordSinkTask() { public void start(Map<String, String> props) { super.start(props); reporter = context.errantRecordReporter(); + executorService = Executors.newSingleThreadExecutor(); + } + + @Override + public void stop() { + ThreadUtils.shutdownExecutorServiceQuietly(executorService, 4, TimeUnit.SECONDS); Review Comment: Just curious--any rationale for four seconds specifically? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java: ########## @@ -75,9 +77,9 @@ public class ErrorHandlingIntegrationTest { private static final String DLQ_TOPIC = "my-connector-errors"; private static final String CONNECTOR_NAME = "error-conn"; private static final String TASK_ID = "error-conn-0"; - private static final int NUM_RECORDS_PRODUCED = 20; - private static final int EXPECTED_CORRECT_RECORDS = 19; + private static final int NUM_RECORDS_PRODUCED = 1000; private static final int EXPECTED_INCORRECT_RECORDS = 1; + private static final int EXPECTED_CORRECT_RECORDS = NUM_RECORDS_PRODUCED - EXPECTED_INCORRECT_RECORDS; Review Comment: Again, I know this isn't your fault, but I'm a little confused at the use of this field in the `testErrantRecordReporter` case. I believe it can be replaced with `NUM_RECORDS_PRODUCED` [here](https://github.com/gharris1727/kafka/blob/f5845038014f3df29d505eeadd01403e9756728f/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java#L220), since this case doesn't cover any errors that should prevent records from reaching tasks. ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java: ########## @@ -54,7 +65,16 @@ public void put(Collection<SinkRecord> records) { .computeIfAbsent(rec.topic(), v -> new HashMap<>()) .computeIfAbsent(rec.kafkaPartition(), v -> new TopicPartition(rec.topic(), rec.kafkaPartition())); committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0) + 1); - reporter.report(rec, new Throwable()); + Throwable error = new Throwable(); + // Test synchronous and asynchronous reporting, allowing for re-ordering the errant reports Review Comment: We don't have any corresponding verification that this behavior is handled correctly. If there's an easy, lightweight way to add that, it'd be nice, but it's not worth blocking on if it's too cumbersome. ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java: ########## @@ -245,7 +249,7 @@ public void testErrantRecordReporter() throws Exception { // consume failed records from dead letter queue topic log.info("Consuming records from test topic"); - ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC); + ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, DLQ_TOPIC); Review Comment: Nit (not your fault): we don't use the `messages` field at all. ```suggestion connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, DLQ_TOPIC); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -392,90 +397,29 @@ public void testSetConfigs() { } @Test - public void testThreadSafety() throws Throwable { - long runtimeMs = 5_000; - int numThreads = 10; - // Check that multiple threads using RetryWithToleranceOperator concurrently - // can't corrupt the state of the ProcessingContext - AtomicReference<Throwable> failed = new AtomicReference<>(null); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, - ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics, new ProcessingContext() { - private final AtomicInteger count = new AtomicInteger(); - private final AtomicInteger attempt = new AtomicInteger(); - - @Override - public void error(Throwable error) { - if (count.getAndIncrement() > 0) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error()")); - } - super.error(error); - } - - @Override - public Future<Void> report() { - if (count.getAndSet(0) > 1) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error() in report()")); - } - - return super.report(); - } - - @Override - public void currentContext(Stage stage, Class<?> klass) { - this.attempt.set(0); - super.currentContext(stage, klass); - } - - @Override - public void attempt(int attempt) { - if (!this.attempt.compareAndSet(attempt - 1, attempt)) { - failed.compareAndSet(null, new AssertionError( - "Concurrent call to attempt(): Attempts should increase monotonically " + - "within the scope of a given currentContext()")); - } - super.attempt(attempt); - } - }, new CountDownLatch(1)); - - ExecutorService pool = Executors.newFixedThreadPool(numThreads); - List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed() - .map(id -> - pool.submit(() -> { - long t0 = System.currentTimeMillis(); - long i = 0; - while (true) { - if (++i % 10000 == 0 && System.currentTimeMillis() > t0 + runtimeMs) { - break; - } - if (failed.get() != null) { - break; - } - try { - if (id < numThreads / 2) { - retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, - SinkTask.class, consumerRecord, new Throwable()).get(); - } else { - retryWithToleranceOperator.execute(() -> null, Stage.TRANSFORMATION, - SinkTask.class); - } - } catch (Exception e) { - failed.compareAndSet(null, e); - } - } - })) - .collect(Collectors.toList()); - pool.shutdown(); - pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS); - futures.forEach(future -> { - try { - future.get(); - } catch (Exception e) { - failed.compareAndSet(null, e); - } Review Comment: Hmmm... it's a little worrisome to see this test (which, as we now know, was insufficient) completely removed without a replacement. I guess it'd be difficult to test again and we can sort of hand-wave that the structural changes in this PR increase thread safety, but if possible, it'd be great to see at least some new testing coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org