This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8058777057 KAFKA-18969 Rewrite ShareConsumerTest#setup and move to
clients-integration-tests module (#19202)
b8058777057 is described below
commit b80587770572328eb369e0aeaddc0a80f2487640
Author: Ken Huang <[email protected]>
AuthorDate: Tue Mar 18 14:47:38 2025 +0800
KAFKA-18969 Rewrite ShareConsumerTest#setup and move to
clients-integration-tests module (#19202)
Move share consumer to clients-integration-tests module and use
`@BeforeEach` to setup
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
build.gradle | 2 +
.../import-control-clients-integration-tests.xml | 1 +
.../kafka/clients/consumer}/ShareConsumerTest.java | 111 ++++++++-------------
3 files changed, 44 insertions(+), 70 deletions(-)
diff --git a/build.gradle b/build.gradle
index ea99ae754bf..e3f0a6e58e5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1973,6 +1973,8 @@ project(':clients:clients-integration-tests') {
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime')
implementation project(':server-common')
+ testImplementation project(':metadata')
+ implementation project(':clients').sourceSets.test.output
implementation project(':group-coordinator')
implementation project(':transaction-coordinator')
diff --git a/checkstyle/import-control-clients-integration-tests.xml
b/checkstyle/import-control-clients-integration-tests.xml
index 8294c43f922..880873676f6 100644
--- a/checkstyle/import-control-clients-integration-tests.xml
+++ b/checkstyle/import-control-clients-integration-tests.xml
@@ -24,6 +24,7 @@
<!-- These are tests, allow whatever -->
<allow pkg="org.apache.kafka"/>
+ <allow pkg="org.junit" />
<allow pkg="kafka"/>
</import-control>
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
similarity index 98%
rename from core/src/test/java/kafka/test/api/ShareConsumerTest.java
rename to
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 44509f7bb26..1d1395ce08a 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.test.api;
+package org.apache.kafka.clients.consumer;
-import kafka.api.BaseConsumerTest;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
@@ -26,14 +25,6 @@ import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
-import org.apache.kafka.clients.consumer.AcknowledgeType;
-import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaShareConsumer;
-import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -49,6 +40,7 @@ import
org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.TimestampType;
@@ -68,6 +60,7 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
@@ -136,12 +129,15 @@ public class ShareConsumerTest {
private final TopicPartition tp2 = new TopicPartition("topic2", 0);
private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
private List<TopicPartition> sgsTopicPartitions;
-
+ private static final String KEY = "content-type";
+ private static final String VALUE = "application/octet-stream";
+
public ShareConsumerTest(ClusterInstance cluster) {
this.cluster = cluster;
}
- private void setup() {
+ @BeforeEach
+ public void setup() {
try {
this.cluster.waitForReadyBrokers();
createTopic("topic");
@@ -157,7 +153,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testPollNoSubscribeFails() {
- setup();
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
assertEquals(Set.of(), shareConsumer.subscription());
// "Consumer is not subscribed to any topics."
@@ -167,7 +162,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscribeAndPollNoRecords() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Set.of(tp.topic());
@@ -181,7 +175,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscribePollUnsubscribe() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Set.of(tp.topic());
@@ -197,7 +190,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscribePollSubscribe() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Set.of(tp.topic());
@@ -215,7 +207,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscribeUnsubscribePollFails() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Set.of(tp.topic());
@@ -233,7 +224,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscribeSubscribeEmptyPollFails() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
Set<String> subscription = Set.of(tp.topic());
@@ -251,7 +241,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscriptionAndPoll() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -268,7 +257,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscriptionAndPollMultiple() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -291,7 +279,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testPollRecordsGreaterThanMaxBytes() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
@@ -310,7 +297,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testAcknowledgementSentOnSubscriptionChange() throws
ExecutionException, InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -347,7 +333,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement()
throws Exception {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -378,7 +363,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testAcknowledgementCommitCallbackOnClose() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -409,7 +393,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testAcknowledgementCommitCallbackInvalidRecordStateException()
throws Exception {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -464,7 +447,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testHeaders() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -516,14 +498,12 @@ public class ShareConsumerTest {
@ClusterTest
public void testHeadersSerializerDeserializer() {
- setup();
- testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(),
new BaseConsumerTest.DeserializerImpl());
+ testHeadersSerializeDeserialize(new SerializerImpl(), new
DeserializerImpl());
verifyShareGroupStateTopicRecordsProduced();
}
@ClusterTest
public void testMaxPollRecords() {
- setup();
int numRecords = 10000;
int maxPollRecords = 2;
@@ -557,7 +537,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testControlRecordsSkipped() throws Exception {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
Producer<byte[], byte[]> nonTransactionalProducer =
createProducer();
@@ -603,7 +582,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testExplicitAcknowledgeSuccess() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -625,7 +603,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testExplicitAcknowledgeCommitSuccess() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -649,7 +626,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testExplicitAcknowledgementCommitAsync() throws
InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
@@ -706,7 +682,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend() throws
InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -744,7 +719,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1")) {
@@ -808,7 +782,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testExplicitAcknowledgeReleasePollAccept() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -832,7 +805,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testExplicitAcknowledgeReleaseAccept() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -853,7 +825,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testExplicitAcknowledgeReleaseClose() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -872,7 +843,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testExplicitAcknowledgeThrowsNotInBatch() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -895,7 +865,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testImplicitAcknowledgeFailsExplicit() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -917,7 +886,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testImplicitAcknowledgeCommitSync() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -941,7 +909,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testImplicitAcknowledgementCommitAsync() throws
InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -981,7 +948,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testConfiguredExplicitAcknowledgeCommitSuccess() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
@@ -1007,7 +973,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testConfiguredImplicitAcknowledgeExplicitAcknowledgeFails() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
@@ -1027,7 +992,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws
Exception {
- setup();
int maxPartitionFetchBytes = 10000;
alterShareAutoOffsetReset("group1", "earliest");
@@ -1052,7 +1016,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testMultipleConsumersWithDifferentGroupIds() throws
InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
alterShareAutoOffsetReset("group2", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
@@ -1103,7 +1066,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testMultipleConsumersInGroupSequentialConsumption() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
@@ -1141,7 +1103,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testMultipleConsumersInGroupConcurrentConsumption()
throws InterruptedException, ExecutionException, TimeoutException {
- setup();
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
int consumerCount = 4;
@@ -1176,7 +1137,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testMultipleConsumersInMultipleGroupsConcurrentConsumption()
throws ExecutionException, InterruptedException, TimeoutException {
- setup();
AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0);
@@ -1239,7 +1199,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testConsumerCloseInGroupSequential() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
@@ -1288,7 +1247,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testMultipleConsumersInGroupFailureConcurrentConsumption()
throws InterruptedException, ExecutionException, TimeoutException {
- setup();
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
int consumerCount = 4;
@@ -1333,7 +1291,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testAcquisitionLockTimeoutOnConsumer() throws
InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -1398,7 +1355,6 @@ public class ShareConsumerTest {
*/
@ClusterTest
public void
testAcknowledgementCommitCallbackCallsShareConsumerDisallowed() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -1442,7 +1398,6 @@ public class ShareConsumerTest {
*/
@ClusterTest
public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup()
throws InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -1467,7 +1422,7 @@ public class ShareConsumerTest {
TestUtils.waitForCondition(() -> {
try {
shareConsumer.poll(Duration.ofMillis(500));
- } catch (org.apache.kafka.common.errors.WakeupException e) {
+ } catch (WakeupException e) {
exceptionThrown.set(true);
}
return exceptionThrown.get();
@@ -1495,7 +1450,6 @@ public class ShareConsumerTest {
*/
@ClusterTest
public void testAcknowledgementCommitCallbackThrowsException() throws
InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -1536,7 +1490,6 @@ public class ShareConsumerTest {
*/
@ClusterTest
public void testPollThrowsInterruptExceptionIfInterrupted() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -1561,7 +1514,6 @@ public class ShareConsumerTest {
*/
@ClusterTest
public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -1579,7 +1531,6 @@ public class ShareConsumerTest {
*/
@ClusterTest
public void testWakeupWithFetchedRecordsAvailable() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -1601,7 +1552,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscriptionFollowedByTopicCreation() throws
InterruptedException {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
@@ -1631,7 +1581,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testSubscriptionAndPollFollowedByTopicDeletion() throws
InterruptedException, ExecutionException {
- setup();
String topic1 = "bar";
String topic2 = "baz";
createTopic(topic1);
@@ -1674,7 +1623,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testLsoMovementByRecordsDeletion() {
- setup();
String groupId = "group1";
alterShareAutoOffsetReset(groupId, "earliest");
@@ -1720,7 +1668,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testShareAutoOffsetResetDefaultValue() {
- setup();
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
Producer<byte[], byte[]> producer = createProducer()) {
@@ -1747,7 +1694,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testShareAutoOffsetResetEarliest() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
Producer<byte[], byte[]> producer = createProducer()) {
@@ -1773,7 +1719,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testShareAutoOffsetResetEarliestAfterLsoMovement() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
try (
ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1");
@@ -1800,7 +1745,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue() {
- setup();
alterShareAutoOffsetReset("group1", "earliest");
alterShareAutoOffsetReset("group2", "latest");
try (ShareConsumer<byte[], byte[]> shareConsumerEarliest =
createShareConsumer("group1");
@@ -1840,7 +1784,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testShareAutoOffsetResetByDuration() throws Exception {
- setup();
// Set auto offset reset to 1 hour before current time
alterShareAutoOffsetReset("group1", "by_duration:PT1H");
@@ -1892,7 +1835,6 @@ public class ShareConsumerTest {
@ClusterTest
public void testShareAutoOffsetResetByDurationInvalidFormat() throws
Exception {
- setup();
// Test invalid duration format
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, "group1");
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
@@ -1931,7 +1873,6 @@ public class ShareConsumerTest {
)
@Timeout(90)
public void testShareConsumerAfterCoordinatorMovement() throws Exception {
- setup();
String topicName = "multipart";
String groupId = "multipartGrp";
Uuid topicId = createTopic(topicName, 3, 3);
@@ -2084,7 +2025,6 @@ public class ShareConsumerTest {
)
@Timeout(150)
public void testComplexShareConsumer() throws Exception {
- setup();
String topicName = "multipart";
String groupId = "multipartGrp";
createTopic(topicName, 3, 3);
@@ -2473,4 +2413,35 @@ public class ShareConsumerTest {
Thread.currentThread().interrupt();
}
}
+
+ public static class SerializerImpl implements Serializer<byte[]> {
+
+ @Override
+ public byte[] serialize(String topic, Headers headers, byte[] data) {
+ headers.add(KEY, VALUE.getBytes());
+ return data;
+ }
+
+ @Override
+ public byte[] serialize(String topic, byte[] data) {
+ fail("method should not be invoked");
+ return null;
+ }
+ }
+
+ public static class DeserializerImpl implements Deserializer<byte[]> {
+
+ @Override
+ public byte[] deserialize(String topic, Headers headers, byte[] data) {
+ Header header = headers.lastHeader(KEY);
+ assertEquals("application/octet-stream", header == null ? null :
new String(header.value()));
+ return data;
+ }
+
+ @Override
+ public byte[] deserialize(String topic, byte[] data) {
+ fail("method should not be invoked");
+ return null;
+ }
+ }
}