C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450850940


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##########
@@ -44,6 +49,12 @@ public ErrantRecordSinkTask() {
         public void start(Map<String, String> props) {
             super.start(props);
             reporter = context.errantRecordReporter();
+            executorService = Executors.newSingleThreadExecutor();
+        }
+
+        @Override
+        public void stop() {
+            ThreadUtils.shutdownExecutorServiceQuietly(executorService, 4, 
TimeUnit.SECONDS);

Review Comment:
   Just curious--any rationale for four seconds specifically?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java:
##########
@@ -75,9 +77,9 @@ public class ErrorHandlingIntegrationTest {
     private static final String DLQ_TOPIC = "my-connector-errors";
     private static final String CONNECTOR_NAME = "error-conn";
     private static final String TASK_ID = "error-conn-0";
-    private static final int NUM_RECORDS_PRODUCED = 20;
-    private static final int EXPECTED_CORRECT_RECORDS = 19;
+    private static final int NUM_RECORDS_PRODUCED = 1000;
     private static final int EXPECTED_INCORRECT_RECORDS = 1;
+    private static final int EXPECTED_CORRECT_RECORDS = NUM_RECORDS_PRODUCED - 
EXPECTED_INCORRECT_RECORDS;

Review Comment:
   Again, I know this isn't your fault, but I'm a little confused at the use of 
this field in the `testErrantRecordReporter` case. I believe it can be replaced 
with `NUM_RECORDS_PRODUCED` 
[here](https://github.com/gharris1727/kafka/blob/f5845038014f3df29d505eeadd01403e9756728f/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java#L220),
 since this case doesn't cover any errors that should prevent records from 
reaching tasks.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##########
@@ -54,7 +65,16 @@ public void put(Collection<SinkRecord> records) {
                     .computeIfAbsent(rec.topic(), v -> new HashMap<>())
                     .computeIfAbsent(rec.kafkaPartition(), v -> new 
TopicPartition(rec.topic(), rec.kafkaPartition()));
                 committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0) 
+ 1);
-                reporter.report(rec, new Throwable());
+                Throwable error = new Throwable();
+                // Test synchronous and asynchronous reporting, allowing for 
re-ordering the errant reports

Review Comment:
   We don't have any corresponding verification that this behavior is handled 
correctly. If there's an easy, lightweight way to add that, it'd be nice, but 
it's not worth blocking on if it's too cumbersome.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java:
##########
@@ -245,7 +249,7 @@ public void testErrantRecordReporter() throws Exception {
 
         // consume failed records from dead letter queue topic
         log.info("Consuming records from test topic");
-        ConsumerRecords<byte[], byte[]> messages = 
connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, 
DLQ_TOPIC);
+        ConsumerRecords<byte[], byte[]> messages = 
connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, 
DLQ_TOPIC);

Review Comment:
   Nit (not your fault): we don't use the `messages` field at all.
   
   ```suggestion
           connect.kafka().consume(NUM_RECORDS_PRODUCED, 
CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -392,90 +397,29 @@ public void testSetConfigs() {
     }
 
     @Test
-    public void testThreadSafety() throws Throwable {
-        long runtimeMs = 5_000;
-        int numThreads = 10;
-        // Check that multiple threads using RetryWithToleranceOperator 
concurrently
-        // can't corrupt the state of the ProcessingContext
-        AtomicReference<Throwable> failed = new AtomicReference<>(null);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-                    private final AtomicInteger count = new AtomicInteger();
-                    private final AtomicInteger attempt = new AtomicInteger();
-
-                    @Override
-                    public void error(Throwable error) {
-                        if (count.getAndIncrement() > 0) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-                        }
-                        super.error(error);
-                    }
-
-                    @Override
-                    public Future<Void> report() {
-                        if (count.getAndSet(0) > 1) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-                        }
-
-                        return super.report();
-                    }
-
-                    @Override
-                    public void currentContext(Stage stage, Class<?> klass) {
-                        this.attempt.set(0);
-                        super.currentContext(stage, klass);
-                    }
-
-                    @Override
-                    public void attempt(int attempt) {
-                        if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-                            failed.compareAndSet(null, new AssertionError(
-                                    "Concurrent call to attempt(): Attempts 
should increase monotonically " +
-                                            "within the scope of a given 
currentContext()"));
-                        }
-                        super.attempt(attempt);
-                    }
-                }, new CountDownLatch(1));
-
-        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-        List<? extends Future<?>> futures = IntStream.range(0, 
numThreads).boxed()
-                .map(id ->
-                        pool.submit(() -> {
-                            long t0 = System.currentTimeMillis();
-                            long i = 0;
-                            while (true) {
-                                if (++i % 10000 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-                                    break;
-                                }
-                                if (failed.get() != null) {
-                                    break;
-                                }
-                                try {
-                                    if (id < numThreads / 2) {
-                                        
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-                                                SinkTask.class, 
consumerRecord, new Throwable()).get();
-                                    } else {
-                                        retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-                                                SinkTask.class);
-                                    }
-                                } catch (Exception e) {
-                                    failed.compareAndSet(null, e);
-                                }
-                            }
-                        }))
-                .collect(Collectors.toList());
-        pool.shutdown();
-        pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-        futures.forEach(future -> {
-            try {
-                future.get();
-            } catch (Exception e) {
-                failed.compareAndSet(null, e);
-            }

Review Comment:
   Hmmm... it's a little worrisome to see this test (which, as we now know, was 
insufficient) completely removed without a replacement. I guess it'd be 
difficult to test again and we can sort of hand-wave that the structural 
changes in this PR increase thread safety, but if possible, it'd be great to 
see at least some new testing coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to