C0urante commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1457585290
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -392,90 +397,29 @@ public void testSetConfigs() { } @Test - public void testThreadSafety() throws Throwable { - long runtimeMs = 5_000; - int numThreads = 10; - // Check that multiple threads using RetryWithToleranceOperator concurrently - // can't corrupt the state of the ProcessingContext - AtomicReference<Throwable> failed = new AtomicReference<>(null); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, - ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics, new ProcessingContext() { - private final AtomicInteger count = new AtomicInteger(); - private final AtomicInteger attempt = new AtomicInteger(); - - @Override - public void error(Throwable error) { - if (count.getAndIncrement() > 0) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error()")); - } - super.error(error); - } - - @Override - public Future<Void> report() { - if (count.getAndSet(0) > 1) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error() in report()")); - } - - return super.report(); - } - - @Override - public void currentContext(Stage stage, Class<?> klass) { - this.attempt.set(0); - super.currentContext(stage, klass); - } - - @Override - public void attempt(int attempt) { - if (!this.attempt.compareAndSet(attempt - 1, attempt)) { - failed.compareAndSet(null, new AssertionError( - "Concurrent call to attempt(): Attempts should increase monotonically " + - "within the scope of a given currentContext()")); - } - super.attempt(attempt); - } - }, new CountDownLatch(1)); - - ExecutorService pool = Executors.newFixedThreadPool(numThreads); - List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed() - .map(id -> - pool.submit(() -> { - long t0 = System.currentTimeMillis(); - long i = 0; - while (true) { - if (++i % 10000 == 0 && System.currentTimeMillis() > t0 + runtimeMs) { - break; - } - if (failed.get() != null) { - break; - } - try { - if (id < numThreads / 2) { - retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, - SinkTask.class, consumerRecord, new Throwable()).get(); - } else { - retryWithToleranceOperator.execute(() -> null, Stage.TRANSFORMATION, - SinkTask.class); - } - } catch (Exception e) { - failed.compareAndSet(null, e); - } - } - })) - .collect(Collectors.toList()); - pool.shutdown(); - pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS); - futures.forEach(future -> { - try { - future.get(); - } catch (Exception e) { - failed.compareAndSet(null, e); - } Review Comment: Phew, okay--the 50% figure is reasonable. Thanks for humoring me 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org