This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8058777057 KAFKA-18969 Rewrite ShareConsumerTest#setup and move to 
clients-integration-tests module (#19202)
b8058777057 is described below

commit b80587770572328eb369e0aeaddc0a80f2487640
Author: Ken Huang <[email protected]>
AuthorDate: Tue Mar 18 14:47:38 2025 +0800

    KAFKA-18969 Rewrite ShareConsumerTest#setup and move to 
clients-integration-tests module (#19202)
    
    Move share consumer to clients-integration-tests module and use 
`@BeforeEach` to setup
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 build.gradle                                       |   2 +
 .../import-control-clients-integration-tests.xml   |   1 +
 .../kafka/clients/consumer}/ShareConsumerTest.java | 111 ++++++++-------------
 3 files changed, 44 insertions(+), 70 deletions(-)

diff --git a/build.gradle b/build.gradle
index ea99ae754bf..e3f0a6e58e5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1973,6 +1973,8 @@ project(':clients:clients-integration-tests') {
     testImplementation project(':test-common:test-common-internal-api')
     testImplementation project(':test-common:test-common-runtime')
     implementation project(':server-common')
+    testImplementation project(':metadata')
+    implementation project(':clients').sourceSets.test.output
     implementation project(':group-coordinator')
     implementation project(':transaction-coordinator')
 
diff --git a/checkstyle/import-control-clients-integration-tests.xml 
b/checkstyle/import-control-clients-integration-tests.xml
index 8294c43f922..880873676f6 100644
--- a/checkstyle/import-control-clients-integration-tests.xml
+++ b/checkstyle/import-control-clients-integration-tests.xml
@@ -24,6 +24,7 @@
 
   <!-- These are tests, allow whatever -->
   <allow pkg="org.apache.kafka"/>
+  <allow pkg="org.junit" />
   <allow pkg="kafka"/>
 
 </import-control>
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
similarity index 98%
rename from core/src/test/java/kafka/test/api/ShareConsumerTest.java
rename to 
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 44509f7bb26..1d1395ce08a 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -14,9 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.test.api;
+package org.apache.kafka.clients.consumer;
 
-import kafka.api.BaseConsumerTest;
 import kafka.server.KafkaBroker;
 
 import org.apache.kafka.clients.admin.Admin;
@@ -26,14 +25,6 @@ import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.RecordsToDelete;
-import org.apache.kafka.clients.consumer.AcknowledgeType;
-import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaShareConsumer;
-import org.apache.kafka.clients.consumer.ShareConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -49,6 +40,7 @@ import 
org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.record.TimestampType;
@@ -68,6 +60,7 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.test.TestUtils;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
 
@@ -136,12 +129,15 @@ public class ShareConsumerTest {
     private final TopicPartition tp2 = new TopicPartition("topic2", 0);
     private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
     private List<TopicPartition> sgsTopicPartitions;
-
+    private static final String KEY = "content-type";
+    private static final String VALUE = "application/octet-stream";
+    
     public ShareConsumerTest(ClusterInstance cluster) {
         this.cluster = cluster;
     }
 
-    private void setup() {
+    @BeforeEach
+    public void setup() {
         try {
             this.cluster.waitForReadyBrokers();
             createTopic("topic");
@@ -157,7 +153,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testPollNoSubscribeFails() {
-        setup();
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             assertEquals(Set.of(), shareConsumer.subscription());
             // "Consumer is not subscribed to any topics."
@@ -167,7 +162,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscribeAndPollNoRecords() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Set.of(tp.topic());
@@ -181,7 +175,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscribePollUnsubscribe() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Set.of(tp.topic());
@@ -197,7 +190,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscribePollSubscribe() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Set.of(tp.topic());
@@ -215,7 +207,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscribeUnsubscribePollFails() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Set.of(tp.topic());
@@ -233,7 +224,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscribeSubscribeEmptyPollFails() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Set.of(tp.topic());
@@ -251,7 +241,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscriptionAndPoll() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -268,7 +257,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscriptionAndPollMultiple() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -291,7 +279,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testPollRecordsGreaterThanMaxBytes() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
@@ -310,7 +297,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testAcknowledgementSentOnSubscriptionChange() throws 
ExecutionException, InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -347,7 +333,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() 
throws Exception {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -378,7 +363,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testAcknowledgementCommitCallbackOnClose() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -409,7 +393,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testAcknowledgementCommitCallbackInvalidRecordStateException() 
throws Exception {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -464,7 +447,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testHeaders() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -516,14 +498,12 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testHeadersSerializerDeserializer() {
-        setup();
-        testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), 
new BaseConsumerTest.DeserializerImpl());
+        testHeadersSerializeDeserialize(new SerializerImpl(), new 
DeserializerImpl());
         verifyShareGroupStateTopicRecordsProduced();
     }
 
     @ClusterTest
     public void testMaxPollRecords() {
-        setup();
         int numRecords = 10000;
         int maxPollRecords = 2;
 
@@ -557,7 +537,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testControlRecordsSkipped() throws Exception {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
              Producer<byte[], byte[]> nonTransactionalProducer = 
createProducer();
@@ -603,7 +582,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testExplicitAcknowledgeSuccess() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -625,7 +603,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testExplicitAcknowledgeCommitSuccess() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -649,7 +626,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testExplicitAcknowledgementCommitAsync() throws 
InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
@@ -706,7 +682,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend() throws 
InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -744,7 +719,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1")) {
@@ -808,7 +782,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testExplicitAcknowledgeReleasePollAccept() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -832,7 +805,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testExplicitAcknowledgeReleaseAccept() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -853,7 +825,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testExplicitAcknowledgeReleaseClose() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -872,7 +843,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testExplicitAcknowledgeThrowsNotInBatch() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -895,7 +865,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testImplicitAcknowledgeFailsExplicit() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -917,7 +886,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testImplicitAcknowledgeCommitSync() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -941,7 +909,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testImplicitAcknowledgementCommitAsync() throws 
InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -981,7 +948,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testConfiguredExplicitAcknowledgeCommitSuccess() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
@@ -1007,7 +973,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testConfiguredImplicitAcknowledgeExplicitAcknowledgeFails() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
@@ -1027,7 +992,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws 
Exception {
-        setup();
         int maxPartitionFetchBytes = 10000;
 
         alterShareAutoOffsetReset("group1", "earliest");
@@ -1052,7 +1016,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testMultipleConsumersWithDifferentGroupIds() throws 
InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareAutoOffsetReset("group2", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
@@ -1103,7 +1066,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testMultipleConsumersInGroupSequentialConsumption() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
@@ -1141,7 +1103,6 @@ public class ShareConsumerTest {
     @ClusterTest
     public void testMultipleConsumersInGroupConcurrentConsumption()
             throws InterruptedException, ExecutionException, TimeoutException {
-        setup();
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
 
         int consumerCount = 4;
@@ -1176,7 +1137,6 @@ public class ShareConsumerTest {
     @ClusterTest
     public void testMultipleConsumersInMultipleGroupsConcurrentConsumption()
             throws ExecutionException, InterruptedException, TimeoutException {
-        setup();
         AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
         AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
         AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0);
@@ -1239,7 +1199,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testConsumerCloseInGroupSequential() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
@@ -1288,7 +1247,6 @@ public class ShareConsumerTest {
     @ClusterTest
     public void testMultipleConsumersInGroupFailureConcurrentConsumption()
             throws InterruptedException, ExecutionException, TimeoutException {
-        setup();
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
 
         int consumerCount = 4;
@@ -1333,7 +1291,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testAcquisitionLockTimeoutOnConsumer() throws 
InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -1398,7 +1355,6 @@ public class ShareConsumerTest {
      */
     @ClusterTest
     public void 
testAcknowledgementCommitCallbackCallsShareConsumerDisallowed() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -1442,7 +1398,6 @@ public class ShareConsumerTest {
      */
     @ClusterTest
     public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup() 
throws InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -1467,7 +1422,7 @@ public class ShareConsumerTest {
             TestUtils.waitForCondition(() -> {
                 try {
                     shareConsumer.poll(Duration.ofMillis(500));
-                } catch (org.apache.kafka.common.errors.WakeupException e) {
+                } catch (WakeupException e) {
                     exceptionThrown.set(true);
                 }
                 return exceptionThrown.get();
@@ -1495,7 +1450,6 @@ public class ShareConsumerTest {
      */
     @ClusterTest
     public void testAcknowledgementCommitCallbackThrowsException() throws 
InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -1536,7 +1490,6 @@ public class ShareConsumerTest {
      */
     @ClusterTest
     public void testPollThrowsInterruptExceptionIfInterrupted() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
@@ -1561,7 +1514,6 @@ public class ShareConsumerTest {
      */
     @ClusterTest
     public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
@@ -1579,7 +1531,6 @@ public class ShareConsumerTest {
      */
     @ClusterTest
     public void testWakeupWithFetchedRecordsAvailable() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -1601,7 +1552,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscriptionFollowedByTopicCreation() throws 
InterruptedException {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
@@ -1631,7 +1581,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testSubscriptionAndPollFollowedByTopicDeletion() throws 
InterruptedException, ExecutionException {
-        setup();
         String topic1 = "bar";
         String topic2 = "baz";
         createTopic(topic1);
@@ -1674,7 +1623,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testLsoMovementByRecordsDeletion() {
-        setup();
         String groupId = "group1";
 
         alterShareAutoOffsetReset(groupId, "earliest");
@@ -1720,7 +1668,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testShareAutoOffsetResetDefaultValue() {
-        setup();
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
              Producer<byte[], byte[]> producer = createProducer()) {
 
@@ -1747,7 +1694,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testShareAutoOffsetResetEarliest() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
              Producer<byte[], byte[]> producer = createProducer()) {
@@ -1773,7 +1719,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testShareAutoOffsetResetEarliestAfterLsoMovement() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (
             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
@@ -1800,7 +1745,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue() {
-        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareAutoOffsetReset("group2", "latest");
         try (ShareConsumer<byte[], byte[]> shareConsumerEarliest = 
createShareConsumer("group1");
@@ -1840,7 +1784,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testShareAutoOffsetResetByDuration() throws Exception {
-        setup();
         // Set auto offset reset to 1 hour before current time
         alterShareAutoOffsetReset("group1", "by_duration:PT1H");
         
@@ -1892,7 +1835,6 @@ public class ShareConsumerTest {
 
     @ClusterTest
     public void testShareAutoOffsetResetByDurationInvalidFormat() throws 
Exception {
-        setup();
         // Test invalid duration format
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, "group1");
         Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
@@ -1931,7 +1873,6 @@ public class ShareConsumerTest {
     )
     @Timeout(90)
     public void testShareConsumerAfterCoordinatorMovement() throws Exception {
-        setup();
         String topicName = "multipart";
         String groupId = "multipartGrp";
         Uuid topicId = createTopic(topicName, 3, 3);
@@ -2084,7 +2025,6 @@ public class ShareConsumerTest {
     )
     @Timeout(150)
     public void testComplexShareConsumer() throws Exception {
-        setup();
         String topicName = "multipart";
         String groupId = "multipartGrp";
         createTopic(topicName, 3, 3);
@@ -2473,4 +2413,35 @@ public class ShareConsumerTest {
             Thread.currentThread().interrupt();
         }
     }
+
+    public static class SerializerImpl implements Serializer<byte[]> {
+
+        @Override
+        public byte[] serialize(String topic, Headers headers, byte[] data) {
+            headers.add(KEY, VALUE.getBytes());
+            return data;
+        }
+
+        @Override
+        public byte[] serialize(String topic, byte[] data) {
+            fail("method should not be invoked");
+            return null;
+        }
+    }
+
+    public static class DeserializerImpl implements Deserializer<byte[]> {
+
+        @Override
+        public byte[] deserialize(String topic, Headers headers, byte[] data) {
+            Header header = headers.lastHeader(KEY);
+            assertEquals("application/octet-stream", header == null ? null : 
new String(header.value()));
+            return data;
+        }
+
+        @Override
+        public byte[] deserialize(String topic, byte[] data) {
+            fail("method should not be invoked");
+            return null;
+        }
+    }
 }

Reply via email to