This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b78d8459cc MINOR: Remove ConsumerTestBuilder.java (#16691)
0b78d8459cc is described below

commit 0b78d8459cc8963ad9c324639f304277a76560ff
Author: brenden20 <[email protected]>
AuthorDate: Sun Aug 4 22:05:53 2024 -0500

    MINOR: Remove ConsumerTestBuilder.java (#16691)
    
    The purpose of this PR is to remove ConsumerTestBuilder.java since it is no 
longer needed. The following PRs have eliminated the use of ConsumerTestBuilder:
    #14930
    #16140
    #16200
    #16312
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/suppressions.xml                        |   3 -
 .../internals/CommitRequestManagerTest.java        |   4 +-
 .../consumer/internals/ConsumerTestBuilder.java    | 316 ---------------------
 .../ShareHeartbeatRequestManagerTest.java          |  14 +-
 4 files changed, 9 insertions(+), 328 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index def3e2bc0d0..df79a17ccbe 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -135,9 +135,6 @@
     <suppress checks="NPathComplexity"
               
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
 
-    <suppress checks="MethodLength"
-              files="ConsumerTestBuilder.java"/>
-
     <suppress 
checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index dc16d31c634..33d2d6aae1b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -73,8 +73,6 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMI
 import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -99,6 +97,8 @@ public class CommitRequestManagerTest {
     private final long retryBackoffMs = 100;
     private final long retryBackoffMaxMs = 1000;
     private static final String CONSUMER_COORDINATOR_METRICS = 
"consumer-coordinator-metrics";
+    private static final String DEFAULT_GROUP_ID = "group-id";
+    private static final String DEFAULT_GROUP_INSTANCE_ID = 
"group-instance-id";
     private final Node mockedNode = new Node(1, "host1", 9092);
     private SubscriptionState subscriptionState;
     private LogContext logContext;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
deleted file mode 100644
index 0e6ff3ebd9e..00000000000
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.clients.ApiVersions;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.GroupRebalanceConfig;
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
-import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.RequestTestUtils;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Timer;
-
-import java.io.Closeable;
-import java.util.HashMap;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
-import static org.apache.kafka.common.utils.Utils.closeQuietly;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-@SuppressWarnings("ClassDataAbstractionCoupling")
-public class ConsumerTestBuilder implements Closeable {
-
-    static final long DEFAULT_RETRY_BACKOFF_MS = 80;
-    static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
-    static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
-    static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
-    static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id";
-    static final String DEFAULT_GROUP_ID = "group-id";
-    static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
-    static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0;
-    static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
-
-    final LogContext logContext = new LogContext();
-    final Time time;
-    public final BlockingQueue<ApplicationEvent> applicationEventQueue;
-    public final BlockingQueue<BackgroundEvent> backgroundEventQueue;
-    final ConsumerConfig config;
-    final long retryBackoffMs;
-    final SubscriptionState subscriptions;
-    final ConsumerMetadata metadata;
-    final FetchConfig fetchConfig;
-    final FetchBuffer fetchBuffer;
-    final Metrics metrics;
-    final Timer pollTimer;
-    final FetchMetricsManager metricsManager;
-    final NetworkClientDelegate networkClientDelegate;
-    final OffsetsRequestManager offsetsRequestManager;
-    final Optional<CoordinatorRequestManager> coordinatorRequestManager;
-    final Optional<CommitRequestManager> commitRequestManager;
-    final Optional<HeartbeatRequestManager> heartbeatRequestManager;
-    final Optional<MembershipManager> membershipManager;
-    final Optional<HeartbeatRequestManager.HeartbeatState> heartbeatState;
-    final Optional<HeartbeatRequestManager.HeartbeatRequestState> 
heartbeatRequestState;
-    final TopicMetadataRequestManager topicMetadataRequestManager;
-    final FetchRequestManager fetchRequestManager;
-    final RequestManagers requestManagers;
-    public final ApplicationEventProcessor applicationEventProcessor;
-    public final BackgroundEventHandler backgroundEventHandler;
-    public final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
-    final MockClient client;
-    final Optional<GroupInformation> groupInfo;
-    final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
-
-    public ConsumerTestBuilder(Optional<GroupInformation> groupInfo) {
-        this(groupInfo, true, true);
-    }
-
-    public ConsumerTestBuilder(Optional<GroupInformation> groupInfo, boolean 
enableAutoCommit, boolean enableAutoTick) {
-        this.groupInfo = groupInfo;
-        this.time = enableAutoTick ? new MockTime(1) : new MockTime();
-        this.applicationEventQueue = new LinkedBlockingQueue<>();
-        this.backgroundEventQueue = new LinkedBlockingQueue<>();
-        this.backgroundEventHandler = spy(new 
BackgroundEventHandler(backgroundEventQueue));
-        this.offsetCommitCallbackInvoker = 
mock(OffsetCommitCallbackInvoker.class);
-        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
-            100,
-            DEFAULT_MAX_POLL_INTERVAL_MS,
-            DEFAULT_HEARTBEAT_INTERVAL_MS,
-            groupInfo.map(gi -> gi.groupId).orElse(null),
-            groupInfo.flatMap(gi -> gi.groupInstanceId),
-            DEFAULT_RETRY_BACKOFF_MS,
-            DEFAULT_RETRY_BACKOFF_MAX_MS,
-            true);
-        ApiVersions apiVersions = new ApiVersions();
-
-        Properties properties = new Properties();
-        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        properties.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
DEFAULT_RETRY_BACKOFF_MS);
-        properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
DEFAULT_REQUEST_TIMEOUT_MS);
-        properties.put(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG, 
DEFAULT_MAX_POLL_INTERVAL_MS);
-
-        if (!enableAutoCommit)
-            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        groupInfo.ifPresent(gi -> {
-            properties.put(GROUP_ID_CONFIG, gi.groupId);
-            gi.groupInstanceId.ifPresent(groupInstanceId -> 
properties.put(GROUP_INSTANCE_ID_CONFIG, groupInstanceId));
-        });
-
-        this.config = new ConsumerConfig(properties);
-
-        this.fetchConfig = new FetchConfig(config);
-        this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-        final long requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-        this.metrics = createMetrics(config, time);
-
-        this.subscriptions = spy(createSubscriptionState(config, logContext));
-        this.metadata = spy(new ConsumerMetadata(config, subscriptions, 
logContext, new ClusterResourceListeners()));
-        this.metricsManager = createFetchMetricsManager(metrics);
-        this.pollTimer = time.timer(groupRebalanceConfig.rebalanceTimeoutMs);
-
-        this.client = new MockClient(time, metadata);
-        MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
-            {
-                String topic1 = "test1";
-                put(topic1, 1);
-                String topic2 = "test2";
-                put(topic2, 1);
-            }
-        });
-        this.client.updateMetadata(metadataResponse);
-
-        this.networkClientDelegate = spy(new NetworkClientDelegate(time,
-                config,
-                logContext,
-                client,
-                metadata,
-                backgroundEventHandler));
-        this.offsetsRequestManager = spy(new 
OffsetsRequestManager(subscriptions,
-                metadata,
-                fetchConfig.isolationLevel,
-                time,
-                retryBackoffMs,
-                requestTimeoutMs,
-                apiVersions,
-                networkClientDelegate,
-                backgroundEventHandler,
-                logContext));
-
-        this.topicMetadataRequestManager = spy(new 
TopicMetadataRequestManager(logContext, time, config));
-
-        if (groupInfo.isPresent()) {
-            GroupInformation gi = groupInfo.get();
-            CoordinatorRequestManager coordinator = spy(new 
CoordinatorRequestManager(
-                    logContext,
-                    DEFAULT_RETRY_BACKOFF_MS,
-                    DEFAULT_RETRY_BACKOFF_MAX_MS,
-                    backgroundEventHandler,
-                    gi.groupId
-            ));
-            CommitRequestManager commit = spy(new CommitRequestManager(time,
-                    logContext,
-                    subscriptions,
-                    config,
-                    coordinator,
-                    offsetCommitCallbackInvoker,
-                    gi.groupId,
-                    gi.groupInstanceId,
-                    metrics));
-            MembershipManager mm = spy(
-                new MembershipManagerImpl(
-                    gi.groupId,
-                    gi.groupInstanceId,
-                    groupRebalanceConfig.rebalanceTimeoutMs,
-                    gi.serverAssignor,
-                    subscriptions,
-                    commit,
-                    metadata,
-                    logContext,
-                    Optional.empty(),
-                    backgroundEventHandler,
-                    time,
-                    mock(RebalanceMetricsManager.class)
-                )
-            );
-            HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new 
HeartbeatRequestManager.HeartbeatState(
-                    subscriptions,
-                    mm,
-                    DEFAULT_MAX_POLL_INTERVAL_MS));
-            HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState(
-                    logContext,
-                    time,
-                    gi.heartbeatIntervalMs,
-                    retryBackoffMs,
-                    DEFAULT_RETRY_BACKOFF_MAX_MS,
-                    gi.heartbeatJitterMs));
-            HeartbeatRequestManager heartbeat = spy(new 
HeartbeatRequestManager(
-                    logContext,
-                    pollTimer,
-                    config,
-                    coordinator,
-                    mm,
-                    heartbeatState,
-                    heartbeatRequestState,
-                    backgroundEventHandler,
-                    metrics));
-
-            this.coordinatorRequestManager = Optional.of(coordinator);
-            this.commitRequestManager = Optional.of(commit);
-            this.heartbeatRequestManager = Optional.of(heartbeat);
-            this.heartbeatState = Optional.of(heartbeatState);
-            this.heartbeatRequestState = Optional.of(heartbeatRequestState);
-            this.membershipManager = Optional.of(mm);
-        } else {
-            this.coordinatorRequestManager = Optional.empty();
-            this.commitRequestManager = Optional.empty();
-            this.heartbeatRequestManager = Optional.empty();
-            this.heartbeatState = Optional.empty();
-            this.heartbeatRequestState = Optional.empty();
-            this.membershipManager = Optional.empty();
-        }
-
-        this.fetchBuffer = new FetchBuffer(logContext);
-        this.fetchRequestManager = spy(new FetchRequestManager(logContext,
-                time,
-                metadata,
-                subscriptions,
-                fetchConfig,
-                fetchBuffer,
-                metricsManager,
-                networkClientDelegate,
-                apiVersions));
-        this.requestManagers = new RequestManagers(logContext,
-                offsetsRequestManager,
-                topicMetadataRequestManager,
-                fetchRequestManager,
-                coordinatorRequestManager,
-                commitRequestManager,
-                heartbeatRequestManager,
-                membershipManager
-            );
-        this.applicationEventProcessor = spy(new ApplicationEventProcessor(
-                logContext,
-                requestManagers,
-                metadata,
-                subscriptions
-            )
-        );
-
-        this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
-                logContext,
-                subscriptions,
-                time,
-                new RebalanceCallbackMetricsManager(metrics)
-        );
-    }
-
-    @Override
-    public void close() {
-        closeQuietly(requestManagers, RequestManagers.class.getSimpleName());
-    }
-
-    public static class GroupInformation {
-        final String groupId;
-        final Optional<String> groupInstanceId;
-        final int heartbeatIntervalMs;
-        final double heartbeatJitterMs;
-        final Optional<String> serverAssignor;
-
-        public GroupInformation(String groupId, Optional<String> 
groupInstanceId) {
-            this(groupId, groupInstanceId, DEFAULT_HEARTBEAT_INTERVAL_MS, 
DEFAULT_HEARTBEAT_JITTER_MS,
-                Optional.of(DEFAULT_REMOTE_ASSIGNOR));
-        }
-
-        public GroupInformation(String groupId, Optional<String> 
groupInstanceId, int heartbeatIntervalMs, double heartbeatJitterMs, 
Optional<String> serverAssignor) {
-            this.heartbeatIntervalMs = heartbeatIntervalMs;
-            this.heartbeatJitterMs = heartbeatJitterMs;
-            this.serverAssignor = serverAssignor;
-            this.groupId = groupId;
-            this.groupInstanceId = groupInstanceId;
-        }
-    }
-
-    static Optional<GroupInformation> createDefaultGroupInformation() {
-        return Optional.of(new GroupInformation(DEFAULT_GROUP_ID, 
Optional.empty()));
-    }
-}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 5fdbc9f6e0d..5ea93dae40f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -59,11 +59,11 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -530,7 +530,7 @@ public class ShareHeartbeatRequestManagerTest {
     public void testHeartbeatState() {
         // The initial ShareGroupHeartbeatRequest sets most fields to their 
initial empty values
         ShareGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
-        assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+        assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, 
data.groupId());
         assertEquals("", data.memberId());
         assertEquals(0, data.memberEpoch());
         assertEquals(Collections.emptyList(), data.subscribedTopicNames());
@@ -540,7 +540,7 @@ public class ShareHeartbeatRequestManagerTest {
         // Mock a response from the group coordinator, that supplies the 
member ID and a new epoch
         mockStableMember();
         data = heartbeatState.buildRequestData();
-        assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+        assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, 
data.groupId());
         assertEquals(memberId, data.memberId());
         assertEquals(1, data.memberEpoch());
         assertNull(data.subscribedTopicNames());
@@ -553,7 +553,7 @@ public class ShareHeartbeatRequestManagerTest {
         membershipManager.onSubscriptionUpdated();
         membershipManager.transitionToFenced(); // And indirect way of moving 
to JOINING state
         data = heartbeatState.buildRequestData();
-        assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+        assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, 
data.groupId());
         assertEquals(memberId, data.memberId());
         assertEquals(0, data.memberEpoch());
         assertEquals(Collections.singletonList(topic), 
data.subscribedTopicNames());

Reply via email to