This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae608c1cb29 KAFKA-19042 Move PlaintextConsumerCallbackTest to 
client-integration-tests module (#19298)
ae608c1cb29 is described below

commit ae608c1cb298a236fcb2773f17abb164958dbf70
Author: Ken Huang <[email protected]>
AuthorDate: Wed Apr 16 11:57:14 2025 +0800

    KAFKA-19042 Move PlaintextConsumerCallbackTest to client-integration-tests 
module (#19298)
    
    Use Java to rewrite `PlaintextConsumerCallbackTest` by new test infra
    and move it to client-integration-tests module.
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../consumer/PlaintextConsumerCallbackTest.java    | 352 +++++++++++++++++++++
 .../kafka/api/PlaintextConsumerCallbackTest.scala  | 175 ----------
 2 files changed, 352 insertions(+), 175 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
new file mode 100644
index 00000000000..800ca3b2c78
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = 3
+)
+public class PlaintextConsumerCallbackTest {
+
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+
+    public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) {
+        this.cluster = clusterInstance;
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerRebalanceListenerAssignOnPartitionsAssigned() throws 
InterruptedException {
+        testRebalanceListenerAssignOnPartitionsAssigned(CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        testRebalanceListenerAssignOnPartitionsAssigned(CONSUMER);
+    }
+
+    private void testRebalanceListenerAssignOnPartitionsAssigned(GroupProtocol 
groupProtocol) throws InterruptedException {
+        try (var consumer = createConsumer(groupProtocol)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws 
InterruptedException {
+        testRebalanceListenerAssignmentOnPartitionsAssigned(CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws 
InterruptedException {
+        testRebalanceListenerAssignmentOnPartitionsAssigned(CONSUMER);
+    }
+
+    private void 
testRebalanceListenerAssignmentOnPartitionsAssigned(GroupProtocol 
groupProtocol) throws InterruptedException {
+        try (var consumer = createConsumer(groupProtocol)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> 
assertTrue(executeConsumer.assignment().contains(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() 
throws InterruptedException {
+        testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws 
InterruptedException {
+        testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(CONSUMER);
+    }
+
+    private void 
testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(GroupProtocol 
groupProtocol) throws InterruptedException {
+        try (var consumer = createConsumer(groupProtocol)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var map = executeConsumer.beginningOffsets(List.of(tp));
+                assertTrue(map.containsKey(tp));
+                assertEquals(0L, map.get(tp));
+            });
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerRebalanceListenerAssignOnPartitionsRevoked() throws 
InterruptedException {
+        testRebalanceListenerAssignOnPartitionsRevoked(CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() 
throws InterruptedException {
+        testRebalanceListenerAssignOnPartitionsRevoked(CONSUMER);
+    }
+
+    private void testRebalanceListenerAssignOnPartitionsRevoked(GroupProtocol 
groupProtocol) throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, groupProtocol, (consumer, partitions) 
-> {
+            var e = assertThrows(IllegalStateException.class, () -> 
consumer.assign(List.of(tp)));
+            assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getMessage());
+        });
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC,
+            (consumer, partitions) -> 
assertTrue(consumer.assignment().contains(tp))
+        );
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER,
+            (consumer, partitions) -> 
assertTrue(consumer.assignment().contains(tp))
+        );
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() 
throws InterruptedException {
+        testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws 
InterruptedException {
+        testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(CONSUMER);
+    }
+
+    private void 
testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(GroupProtocol 
groupProtocol) throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, groupProtocol, (consumer, partitions) 
-> {
+            var map = consumer.beginningOffsets(List.of(tp));
+            assertTrue(map.containsKey(tp));
+            assertEquals(0L, map.get(tp));
+        });
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER);
+    }
+
+    private void 
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(GroupProtocol
 groupProtocol) throws InterruptedException {
+        try (var consumer = createConsumer(groupProtocol)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> assertDoesNotThrow(() -> 
executeConsumer.position(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER);
+    }
+
+    private void 
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(GroupProtocol
 groupProtocol) throws InterruptedException {
+        try (var consumer = createConsumer(groupProtocol)) {
+            var startingOffset = 100L;
+            var totalRecords = 120;
+            var startingTimestamp = 0L;
+
+            sendRecords(totalRecords, startingTimestamp);
+
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                executeConsumer.seek(tp, startingOffset);
+                executeConsumer.pause(List.of(tp));
+            });
+
+            assertTrue(consumer.paused().contains(tp));
+            consumer.resume(List.of(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                (int) (totalRecords - startingOffset),
+                (int) startingOffset,
+                (int) startingOffset,
+                startingOffset
+            );
+        }
+    }
+
+    private void triggerOnPartitionsAssigned(
+        TopicPartition tp,
+        Consumer<byte[], byte[]> consumer,
+        BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> 
execute
+    ) throws InterruptedException {
+        var partitionsAssigned = new AtomicBoolean(false);
+        consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                // Make sure the partition used in the test is actually 
assigned before continuing.
+                if (partitions.contains(tp)) {
+                    execute.accept(consumer, partitions);
+                    partitionsAssigned.set(true);
+                }
+            }
+
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                // noop
+            }
+        });
+        TestUtils.waitForCondition(
+            () -> {
+                consumer.poll(Duration.ofMillis(100));
+                return partitionsAssigned.get();
+            },
+            "Timed out before expected rebalance completed"
+        );
+    }
+
+    private void triggerOnPartitionsRevoked(
+        TopicPartition tp,
+        GroupProtocol protocol,
+        BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> 
execute
+    ) throws InterruptedException {
+        var partitionsAssigned = new AtomicBoolean(false);
+        var partitionsRevoked = new AtomicBoolean(false);
+        try (var consumer = createConsumer(protocol)) {
+            consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() 
{
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // Make sure the partition used in the test is actually 
assigned before continuing.
+                    if (partitions.contains(tp)) {
+                        partitionsAssigned.set(true);
+                    }
+                }
+
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    // Make sure the partition used in the test is actually 
revoked before continuing.
+                    if (partitions.contains(tp)) {
+                        execute.accept(consumer, partitions);
+                        partitionsRevoked.set(true);
+                    }
+                }
+            });
+            TestUtils.waitForCondition(
+                () -> {
+                    consumer.poll(Duration.ofMillis(100));
+                    return partitionsAssigned.get();
+                },
+                "Timed out before expected rebalance completed"
+            );
+        }
+        assertTrue(partitionsRevoked.get());
+    }
+
+    private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol) {
+        return cluster.consumer(Map.of(
+            GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, "false"
+        ));
+    }
+
+    private void sendRecords(int numRecords, long startingTimestamp) {
+        try (Producer<byte[], byte[]> producer = cluster.producer()) {
+            for (var i = 0; i < numRecords; i++) {
+                var timestamp = startingTimestamp + i;
+                var record = new ProducerRecord<>(
+                    tp.topic(),
+                    tp.partition(),
+                    timestamp,
+                    ("key " + i).getBytes(),
+                    ("value " + i).getBytes()
+                );
+                producer.send(record);
+            }
+            producer.flush();
+        }
+    }
+
+    protected void consumeAndVerifyRecords(
+        Consumer<byte[], byte[]> consumer,
+        int numRecords,
+        int startingOffset,
+        int startingKeyAndValueIndex,
+        long startingTimestamp
+    ) throws InterruptedException {
+        var records = consumeRecords(consumer, numRecords);
+        for (var i = 0; i < numRecords; i++) {
+            var record = records.get(i);
+            var offset = startingOffset + i;
+
+            assertEquals(tp.topic(), record.topic());
+            assertEquals(tp.partition(), record.partition());
+
+            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+            var timestamp = startingTimestamp + i;
+            assertEquals(timestamp, record.timestamp());
+
+            assertEquals(offset, record.offset());
+            var keyAndValueIndex = startingKeyAndValueIndex + i;
+            assertEquals("key " + keyAndValueIndex, new String(record.key()));
+            assertEquals("value " + keyAndValueIndex, new 
String(record.value()));
+            // this is true only because K and V are byte arrays
+            assertEquals(("key " + keyAndValueIndex).length(), 
record.serializedKeySize());
+            assertEquals(("value " + keyAndValueIndex).length(), 
record.serializedValueSize());
+        }
+    }
+
+    protected <K, V> List<ConsumerRecord<K, V>> consumeRecords(
+        Consumer<K, V> consumer,
+        int numRecords
+    ) throws InterruptedException {
+        List<ConsumerRecord<K, V>> records = new ArrayList<>();
+        TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(100)).forEach(records::add);
+            return records.size() >= numRecords;
+        }, 60000, "Timed out before consuming expected " + numRecords + " 
records.");
+
+        return records;
+    }
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala
deleted file mode 100644
index e9c0ba0cc22..00000000000
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package integration.kafka.api
-
-import kafka.api.AbstractConsumerTest
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener}
-import org.apache.kafka.common.TopicPartition
-import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, 
assertThrows, assertTrue}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import java.util
-import java.util.Arrays.asList
-import java.util.Collections
-import java.util.concurrent.atomic.AtomicBoolean
-
-/**
- * Integration tests for the consumer that cover interaction with the consumer 
from within callbacks
- * and listeners.
- */
-class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testConsumerRebalanceListenerAssignOnPartitionsAssigned(groupProtocol: 
String): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    triggerOnPartitionsAssigned(tp, { (consumer, _) =>
-      val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
-      assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")
-    })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def 
testConsumerRebalanceListenerAssignmentOnPartitionsAssigned(groupProtocol: 
String): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    triggerOnPartitionsAssigned(tp, { (consumer, _) =>
-      assertTrue(consumer.assignment().contains(tp));
-    })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned(groupProtocol:
 String): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    triggerOnPartitionsAssigned(tp, { (consumer, _) =>
-      val map = consumer.beginningOffsets(Collections.singletonList(tp))
-      assertTrue(map.containsKey(tp))
-      assertEquals(0, map.get(tp))
-    })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testConsumerRebalanceListenerAssignOnPartitionsRevoked(groupProtocol: 
String): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    triggerOnPartitionsRevoked(tp, { (consumer, _) =>
-      val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
-      assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")
-    })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def 
testConsumerRebalanceListenerAssignmentOnPartitionsRevoked(groupProtocol: 
String): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    triggerOnPartitionsRevoked(tp, { (consumer, _) =>
-      assertTrue(consumer.assignment().contains(tp))
-    })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked(groupProtocol: 
String): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    triggerOnPartitionsRevoked(tp, { (consumer, _) =>
-      val map = consumer.beginningOffsets(Collections.singletonList(tp))
-      assertTrue(map.containsKey(tp))
-      assertEquals(0, map.get(tp))
-    })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def 
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(groupProtocol:
 String): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    triggerOnPartitionsAssigned(tp, { (consumer, _) => assertDoesNotThrow(() 
=> consumer.position(tp)) })
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def 
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(groupProtocol:
 String): Unit = {
-    val consumer = createConsumer()
-    val startingOffset = 100L
-    val totalRecords = 120L
-
-    val producer = createProducer()
-    val startingTimestamp = 0
-    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp)
-
-    triggerOnPartitionsAssigned(tp, consumer, { (consumer, _) =>
-      consumer.seek(tp, startingOffset)
-      consumer.pause(asList(tp))
-    })
-
-    assertTrue(consumer.paused().contains(tp))
-    consumer.resume(asList(tp))
-    consumeAndVerifyRecords(consumer, numRecords = (totalRecords - 
startingOffset).toInt,
-      startingOffset = startingOffset.toInt, startingKeyAndValueIndex = 
startingOffset.toInt,
-      startingTimestamp = startingOffset)
-  }
-
-  private def triggerOnPartitionsAssigned(tp: TopicPartition,
-                                          execute: (Consumer[Array[Byte], 
Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
-    val consumer = createConsumer()
-    triggerOnPartitionsAssigned(tp, consumer, execute)
-  }
-
-  private def triggerOnPartitionsAssigned(tp: TopicPartition,
-                                          consumer: Consumer[Array[Byte], 
Array[Byte]],
-                                          execute: (Consumer[Array[Byte], 
Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
-    val partitionsAssigned = new AtomicBoolean(false)
-    consumer.subscribe(asList(topic), new ConsumerRebalanceListener {
-      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
-        // Make sure the partition used in the test is actually assigned 
before continuing.
-        if (partitions.contains(tp)) {
-          execute(consumer, partitions)
-          partitionsAssigned.set(true)
-        }
-      }
-
-      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
-        // noop
-      }
-    })
-    TestUtils.pollUntilTrue(consumer, () => partitionsAssigned.get(), "Timed 
out before expected rebalance completed")
-  }
-
-  private def triggerOnPartitionsRevoked(tp: TopicPartition,
-                                         execute: (Consumer[Array[Byte], 
Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = {
-    val consumer = createConsumer()
-    val partitionsAssigned = new AtomicBoolean(false)
-    val partitionsRevoked = new AtomicBoolean(false)
-    consumer.subscribe(asList(topic), new ConsumerRebalanceListener {
-      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
-        // Make sure the partition used in the test is actually assigned 
before continuing.
-        if (partitions.contains(tp)) {
-          partitionsAssigned.set(true)
-        }
-      }
-
-      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
-        // Make sure the partition used in the test is actually revoked before 
continuing.
-        if (partitions.contains(tp)) {
-          execute(consumer, partitions)
-          partitionsRevoked.set(true)
-        }
-      }
-    })
-    TestUtils.pollUntilTrue(consumer, () => partitionsAssigned.get(), "Timed 
out before expected rebalance completed")
-    consumer.close()
-    assertTrue(partitionsRevoked.get())
-  }
-}

Reply via email to