This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c3a1bef4296 KAFKA-16715: Create KafkaShareConsumer interfaces (#16134) c3a1bef4296 is described below commit c3a1bef4296ef306646f9fa44b92bc6de08d628b Author: Andrew Schofield <aschofi...@confluent.io> AuthorDate: Tue Jun 4 04:35:59 2024 +0100 KAFKA-16715: Create KafkaShareConsumer interfaces (#16134) This PR introduces ShareConsumer and KafkaShareConsumer. It is focused entirely on the minimal additions required to introduce the external programming interfaces. Reviewers: Apoorv Mittal <amit...@confluent.io>, Manikumar Reddy <manikumar.re...@gmail.com> --- .../kafka/clients/consumer/AcknowledgeType.java | 58 ++ .../consumer/AcknowledgementCommitCallback.java | 53 ++ .../kafka/clients/consumer/KafkaShareConsumer.java | 700 +++++++++++++++++++++ .../kafka/clients/consumer/MockShareConsumer.java | 170 +++++ .../kafka/clients/consumer/ShareConsumer.java | 115 ++++ .../consumer/internals/ShareConsumerDelegate.java | 37 ++ .../internals/ShareConsumerDelegateCreator.java | 57 ++ .../clients/consumer/MockShareConsumerTest.java | 50 ++ 8 files changed, 1240 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java new file mode 100644 index 00000000000..14b5415c2a4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Locale; + +@InterfaceStability.Evolving +public enum AcknowledgeType { + /** The record was consumed successfully. */ + ACCEPT((byte) 1), + + /** The record was not consumed successfully. Release it for another delivery attempt. */ + RELEASE((byte) 2), + + /** The record was not consumed successfully. Reject it and do not release it for another delivery attempt. */ + REJECT((byte) 3); + + public final byte id; + + AcknowledgeType(byte id) { + this.id = id; + } + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + + + public static AcknowledgeType forId(byte id) { + switch (id) { + case 1: + return ACCEPT; + case 2: + return RELEASE; + case 3: + return REJECT; + default: + throw new IllegalArgumentException("Unknown acknowledge type id: " + id); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java new file mode 100644 index 00000000000..f37fbe05750 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidRecordStateException; +import org.apache.kafka.common.errors.WakeupException; + +import java.util.Map; +import java.util.Set; + +/** + * A callback interface that the user can implement to trigger custom actions when an acknowledgement completes. + * The callback may be executed in any thread calling {@link ShareConsumer#poll(java.time.Duration)}. + */ +@InterfaceStability.Evolving +public interface AcknowledgementCommitCallback { + + /** + * A callback method the user can implement to provide asynchronous handling of acknowledgement completion. + * This method will be called when the acknowledgement request sent to the server has been completed. + * + * @param offsets A map of the offsets that this callback applies to. + * + * @param exception The exception thrown during processing of the request, or null if the acknowledgement completed successfully. + * <p><ul> + * <li> {@link InvalidRecordStateException} if the record state is invalid + * <li> {@link AuthorizationException} if not authorized to the topic of group + * <li> {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is called before or while this function is called + * <li> {@link InterruptException} if the calling thread is interrupted before or while this function is called + * <li> {@link KafkaException} for any other unrecoverable errors + * </ul> + */ + void onComplete(Map<TopicIdPartition, Set<Long>> offsets, Exception exception); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java new file mode 100644 index 00000000000..2fc721a84ff --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -0,0 +1,700 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; +import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate; +import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; + +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.propsToMap; + +/** + * A client that consumes records from a Kafka cluster using a share group. + * <p> + * <em>This is an early access feature introduced by KIP-932. It is not suitable for production use until it is + * fully implemented and released.</em> + * + * <h3>Cross-Version Compatibility</h3> + * This client can communicate with brokers that are version 4.0.0 or newer. You will receive an + * {@link org.apache.kafka.common.errors.UnsupportedVersionException} when invoking an API that is not + * available on the running broker version. + * + * <h3><a name="sharegroups">Share Groups and Topic Subscriptions</a></h3> + * Kafka uses the concept of <i>share groups</i> to allow a pool of consumers to cooperate on the work of + * consuming and processing records. All consumer instances sharing the same {@code group.id} will be part of + * the same share group. + * <p> + * Each consumer in a group can dynamically set the list of topics it wants to subscribe to using the + * {@link #subscribe(Collection)} method. Kafka will deliver each message in the subscribed topics to one + * consumer in the share group. Unlike consumer groups, share groups balance the partitions between all + * members of the share group permitting multiple consumers to consume from the same partitions. This gives + * more flexible sharing of records than a consumer group, at the expense of record ordering. + * <p> + * Membership in a share group is maintained dynamically: if a consumer fails, the partitions assigned to + * it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, + * the partition assignment is re-evaluated and partitions can be moved from existing consumers to the new one. + * This is known as <i>rebalancing</i> the group and is discussed in more detail <a href="#failures">below</a>. + * Group rebalancing is also used when new partitions are added to one of the subscribed topics. The group will + * automatically detect the new partitions through periodic metadata refreshes and assign them to the members of the group. + * <p> + * Conceptually, you can think of a share group as a single logical subscriber made up of multiple consumers. + * In fact, in other messaging systems, a share group is roughly equivalent to a <em>durable shared subscription</em>. + * You can have multiple share groups and consumer groups independently consuming from the same topics. + * + * <h3><a name="failures">Detecting Consumer Failures</a></h3> + * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(Duration)} is + * invoked. This method is designed to ensure consumer liveness. As long as you continue to call poll, the consumer + * will stay in the group and continue to receive records from the partitions it was assigned. Under the covers, + * the consumer sends periodic heartbeats to the broker. If the consumer crashes or is unable to send heartbeats for + * the duration of the share group's session time-out, then the consumer will be considered dead and its partitions + * will be reassigned. + * <p> + * It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats + * in the background, but no progress is being made. To prevent the consumer from holding onto its partitions + * indefinitely in this case, we provide a liveness detection mechanism using the {@code max.poll.interval.ms} setting. + * If you don't call poll at least as frequently as this, the client will proactively leave the share group. + * So to stay in the group, you must continue to call poll. + * + * <h3>Record Delivery and Acknowledgement</h3> + * When a consumer in a share-group fetches records using {@link #poll(Duration)}, it receives available records from any + * of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a + * time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default, + * the lock duration is 30 seconds, but it can also be controlled using the group {@code group.share.record.lock.duration.ms} + * configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and + * then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in + * the following ways: + * <ul> + * <li>The consumer can acknowledge successful processing of the record</li> + * <li>The consumer can release the record, which makes the record available for another delivery attempt</li> + * <li>The consumer can reject the record, which indicates that the record is unprocessable and does not make + * the record available for another delivery attempt</li> + * <li>The consumer can do nothing, in which case the lock is automatically released when the lock duration has elapsed</li> + * </ul> + * The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit + * is reached, fetching records will temporarily yield no further records until the number of acquired records reduces, + * as naturally happens when the locks time out. This limit is controlled by the broker configuration property + * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically + * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures. + * <p> + * The consumer can choose to use implicit or explicit acknowledgement of the records it processes. + * <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, + * it is using <em>explicit acknowledgement</em>. In this case: + * <ul> + * <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which commits the acknowledgements to Kafka. + * If any records in the batch were not acknowledged, they remain acquired and will be presented to the application + * in response to a future poll.</li> + * <li>The application calls {@link #poll(Duration)} without committing first, which commits the acknowledgements to + * Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. + * If any records in the batch were not acknowledged, they remain acquired and will be presented to the application + * in response to a future poll.</li> + * <li>The application calls {@link #close()} which attempts to commit any pending acknowledgements and + * releases any remaining acquired records.</li> + * </ul> + * If the application does not call {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch, + * it is using <em>implicit acknowledgement</em>. In this case: + * <ul> + * <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all of + * the delivered records as processed successfully and commits the acknowledgements to Kafka.</li> + * <li>The application calls {@link #poll(Duration)} without committing, which also implicitly acknowledges all of + * the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is + * thrown by a failure to commit the acknowledgements.</li> + * <li>The application calls {@link #close()} which releases any acquired records without acknowledgement.</li> + * </ul> + * <p> + * The consumer guarantees that the records returned in the {@code ConsumerRecords} object for a specific topic-partition + * are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records + * in a batch are performed atomically. This makes error handling significantly more straightforward because there can be + * one error code per partition. + * + * <h3>Usage Examples</h3> + * The share consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to + * demonstrate how to use them. + * + * <h4>Acknowledging a batch of records (implicit acknowledgement)</h4> + * This example demonstrates implicit acknowledgement using {@link #poll(Duration)} to acknowledge the records which + * were delivered in the previous poll. All the records delivered are implicitly marked as successfully consumed and + * acknowledged synchronously with Kafka as the consumer fetches more records. + * <pre> + * Properties props = new Properties(); + * props.setProperty("bootstrap.servers", "localhost:9092"); + * props.setProperty("group.id", "test"); + * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); + * consumer.subscribe(Arrays.asList("foo")); + * while (true) { + * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + * for (ConsumerRecord<String, String> record : records) { + * System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); + * doProcessing(record); + * } + * } + * </pre> + * + * Alternatively, you can use {@link #commitSync()} or {@link #commitAsync()} to commit the acknowledgements, but this is + * slightly less efficient because there is an additional request sent to Kafka. + * <pre> + * Properties props = new Properties(); + * props.setProperty("bootstrap.servers", "localhost:9092"); + * props.setProperty("group.id", "test"); + * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); + * consumer.subscribe(Arrays.asList("foo")); + * while (true) { + * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + * for (ConsumerRecord<String, String> record : records) { + * System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); + * doProcessing(record); + * } + * consumer.commitSync(); + * } + * </pre> + * + * <h4>Per-record acknowledgement (explicit acknowledgement)</h4> + * This example demonstrates using different acknowledgement types depending on the outcome of processing the records. + * <pre> + * Properties props = new Properties(); + * props.setProperty("bootstrap.servers", "localhost:9092"); + * props.setProperty("group.id", "test"); + * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); + * consumer.subscribe(Arrays.asList("foo")); + * while (true) { + * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + * for (ConsumerRecord<String, String> record : records) { + * try { + * doProcessing(record); + * consumer.acknowledge(record, AcknowledgeType.ACCEPT); + * } catch (Exception e) { + * consumer.acknowledge(record, AcknowledgeType.REJECT); + * } + * } + * consumer.commitSync(); + * } + * </pre> + * + * Each record processed is separately acknowledged using a call to {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. + * The {@link AcknowledgeType} argument indicates whether the record was processed successfully or not. In this case, + * the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error + * such as a semantic error, this is appropriate. For a transient error which might not affect a subsequent processing + * attempt, {@link AcknowledgeType#RELEASE} is more appropriate because the record remains eligible for further delivery attempts. + * <p> + * The calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} are simply updating local information in the consumer. + * It is only once {@link #commitSync()} is called that the acknowledgements are committed by sending the new state + * information to Kafka. + * + * <h4>Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)</h4> + * This example demonstrates ending processing of a batch of records on the first error. + * <pre> + * Properties props = new Properties(); + * props.setProperty("bootstrap.servers", "localhost:9092"); + * props.setProperty("group.id", "test"); + * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); + * consumer.subscribe(Arrays.asList("foo")); + * while (true) { + * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + * for (ConsumerRecord<String, String> record : records) { + * try { + * doProcessing(record); + * consumer.acknowledge(record, AcknowledgeType.ACCEPT); + * } catch (Exception e) { + * consumer.acknowledge(record, AcknowledgeType.REJECT); + * break; + * } + * } + * consumer.commitSync(); + * } + * </pre> + * There are the following cases in this example: + * <ol> + * <li>The batch contains no records, in which case the application just polls again. The call to {@link #commitSync()} + * just does nothing because the batch was empty.</li> + * <li>All of the records in the batch are processed successfully. The calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} + * specifying {@code AcknowledgeType.ACCEPT} mark all records in the batch as successfully processed.</li> + * <li>One of the records encounters an exception. The call to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} specifying + * {@code AcknowledgeType.REJECT} rejects that record. Earlier records in the batch have already been marked as successfully + * processed. The call to {@link #commitSync()} commits the acknowledgements, but the records after the failed record + * remain acquired as part of the same delivery attempt and will be presented to the application in response to another poll.</li> + * </ol> + * + * <h3>Reading Transactional Records</h3> + * The way that share groups handle transactional records is controlled by the {@code group.share.isolation.level}</code> + * configuration property. In a share group, the isolation level applies to the entire share group, not just individual + * consumers. + * <p> + * In <code>read_uncommitted</code> isolation level, the share group consumes all non-transactional and transactional + * records. The consumption is bounded by the high-water mark. + * <p> + * In <code>read_committed</code> isolation level (not yet supported), the share group only consumes non-transactional + * records and committed transactional records. The set of records which are eligible to become in-flight records are + * non-transactional records and committed transactional records only. The consumption is bounded by the last stable + * offset, so an open transaction blocks the progress of the share group with read_committed isolation level. + * + * <h3><a name="multithreaded">Multithreaded Processing</a></h3> + * The consumer is NOT thread-safe. It is the responsibility of the user to ensure that multithreaded access + * is properly synchronized. Unsynchronized access will result in {@link java.util.ConcurrentModificationException}. + * <p> + * The only exception to this rule is {@link #wakeup()} which can safely be used from an external thread to + * interrupt an active operation. In this case, a {@link org.apache.kafka.common.errors.WakeupException} will be + * thrown from the thread blocking on the operation. This can be used to shut down the consumer from another thread. + * The following snippet shows the typical pattern: + * + * <pre> + * public class KafkaShareConsumerRunner implements Runnable { + * private final AtomicBoolean closed = new AtomicBoolean(false); + * private final KafkaShareConsumer consumer; + * + * public KafkaShareConsumerRunner(KafkaShareConsumer consumer) { + * this.consumer = consumer; + * } + * + * {@literal}@Override + * public void run() { + * try { + * consumer.subscribe(Arrays.asList("topic")); + * while (!closed.get()) { + * ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); + * // Handle new records + * } + * } catch (WakeupException e) { + * // Ignore exception if closing + * if (!closed.get()) throw e; + * } finally { + * consumer.close(); + * } + * } + * + * // Shutdown hook which can be called from a separate thread + * public void shutdown() { + * closed.set(true); + * consumer.wakeup(); + * } + * } + * </pre> + * + * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer. + * <pre> + * closed.set(true); + * consumer.wakeup(); + * </pre> + * + * <p> + * Note that while it is possible to use thread interrupts instead of {@link #wakeup()} to abort a blocking operation + * (in which case, {@link InterruptException} will be raised), we discourage their use since they may cause a clean + * shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using {@link #wakeup()} + * is impossible, such as when a consumer thread is managed by code that is unaware of the Kafka client. + * <p> + * We have intentionally avoided implementing a particular threading model for processing. Various options for + * multithreaded processing are possible, of which the most straightforward is to dedicate a thread to each consumer. + */ +@InterfaceStability.Evolving +public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> { + + private final static ShareConsumerDelegateCreator CREATOR = new ShareConsumerDelegateCreator(); + + private final ShareConsumerDelegate<K, V> delegate; + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings + * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >here</a>. Values can be + * either strings or objects of the appropriate type (for example a numeric configuration would accept either the + * string "42" or the integer 42). + * <p> + * Valid configuration strings are documented at {@link ConsumerConfig}. + * <p> + * Note: after creating a {@code KafkaShareConsumer} you must always {@link #close()} it to avoid resource leaks. + * + * @param configs The consumer configs + */ + public KafkaShareConsumer(Map<String, Object> configs) { + this(configs, null, null); + } + + /** + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. + * <p> + * Valid configuration strings are documented at {@link ConsumerConfig}. + * <p> + * Note: after creating a {@code KafkaShareConsumer} you must always {@link #close()} it to avoid resource leaks. + * + * @param properties The consumer configuration properties + */ + public KafkaShareConsumer(Properties properties) { + this(properties, null, null); + } + + /** + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration, and a + * key and a value {@link Deserializer}. + * <p> + * Valid configuration strings are documented at {@link ConsumerConfig}. + * <p> + * Note: after creating a {@code KafkaShareConsumer} you must always {@link #close()} it to avoid resource leaks. + * + * @param properties The consumer configuration properties + * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + */ + public KafkaShareConsumer(Properties properties, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer) { + this(propsToMap(properties), keyDeserializer, valueDeserializer); + } + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}. + * <p> + * Valid configuration strings are documented at {@link ConsumerConfig}. + * <p> + * Note: after creating a {@code KafkaShareConsumer} you must always {@link #close()} it to avoid resource leaks. + * + * @param configs The consumer configs + * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + */ + public KafkaShareConsumer(Map<String, Object> configs, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer) { + this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), + keyDeserializer, valueDeserializer); + } + + public KafkaShareConsumer(ConsumerConfig config, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer) { + delegate = CREATOR.create(config, keyDeserializer, valueDeserializer); + } + + KafkaShareConsumer(final LogContext logContext, + final String clientId, + final String groupId, + final ConsumerConfig config, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer, + final Time time, + final KafkaClient client, + final SubscriptionState subscriptions, + final ConsumerMetadata metadata) { + delegate = CREATOR.create( + logContext, clientId, groupId, config, keyDeserializer, valueDeserializer, + time, client, subscriptions, metadata); + } + + /** + * Get the current subscription. Will return the same topics used in the most recent call to + * {@link #subscribe(Collection)}, or an empty set if no such call has been made. + * + * @return The set of topics currently subscribed to + */ + @Override + public Set<String> subscription() { + return delegate.subscription(); + } + + /** + * Subscribe to the given list of topics to get dynamically assigned partitions. + * <b>Topic subscriptions are not incremental. This list will replace the current + * assignment, if there is one.</b> If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. + * + * <p> + * As part of group management, the coordinator will keep track of the list of consumers that belong to a particular + * group and will trigger a rebalance operation if any one of the following events are triggered: + * <ul> + * <li>A member joins or leaves the share group + * <li>An existing member of the share group is shut down or fails + * <li>The number of partitions changes for any of the subscribed topics + * <li>A subscribed topic is created or deleted + * </ul> + * + * @param topics The list of topics to subscribe to + * + * @throws IllegalArgumentException if topics is null or contains null or empty elements + * @throws KafkaException for any other unrecoverable errors + */ + @Override + public void subscribe(Collection<String> topics) { + delegate.subscribe(topics); + } + + /** + * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. + * + * @throws KafkaException for any other unrecoverable errors + */ + @Override + public void unsubscribe() { + delegate.unsubscribe(); + } + + /** + * Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have + * subscribed to any topics before polling for data. + * + * <p> + * This method returns immediately if there are records available. Otherwise, it will await the passed timeout. + * If the timeout expires, an empty record set will be returned. + * + * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds) + * + * @return map of topic to records since the last fetch for the subscribed list of topics + * + * @throws AuthenticationException if authentication fails. See the exception for more details + * @throws AuthorizationException if caller lacks Read access to any of the subscribed + * topics or to the share group. See the exception for more details + * @throws IllegalArgumentException if the timeout value is negative + * @throws IllegalStateException if the consumer is not subscribed to any topics + * @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds. + * @throws InvalidTopicException if the current subscription contains any invalid + * topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)}) + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws InterruptException if the calling thread is interrupted before or while this method is called + * @throws KafkaException for any other unrecoverable errors + */ + @Override + public ConsumerRecords<K, V> poll(Duration timeout) { + return delegate.poll(timeout); + } + + /** + * Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call. + * The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or + * {@link #poll(Duration)} call. + * + * @param record The record to acknowledge + * + * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already + * used implicit acknowledgement + */ + @Override + public void acknowledge(ConsumerRecord<K, V> record) { + delegate.acknowledge(record); + } + + /** + * Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether + * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()}, + * {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using + * <b>explicit acknowledgement</b>. + * + * @param record The record to acknowledge + * @param type The acknowledgement type which indicates whether it was processed successfully + * + * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already + * used implicit acknowledgement + */ + @Override + public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) { + delegate.acknowledge(record, type); + } + + /** + * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, + * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or + * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement, + * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged. + * + * <p> + * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} + * expires. + * + * @return A map of the results for each topic-partition for which delivery was acknowledged. + * If the acknowledgement failed for a topic-partition, an exception is present. + * + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws InterruptException if the thread is interrupted while blocked + * @throws KafkaException for any other unrecoverable errors + */ + @Override + public Map<TopicIdPartition, Optional<KafkaException>> commitSync() { + return delegate.commitSync(); + } + + /** + * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, + * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or + * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement, + * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged. + + * <p> + * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout expires. + * + * @param timeout The maximum amount of time to await completion of the acknowledgement + * + * @return A map of the results for each topic-partition for which delivery was acknowledged. + * If the acknowledgement failed for a topic-partition, an exception is present. + * + * @throws IllegalArgumentException if the {@code timeout} is negative + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws InterruptException if the thread is interrupted while blocked + * @throws KafkaException for any other unrecoverable errors + */ + @Override + public Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout) { + return delegate.commitSync(timeout); + } + + /** + * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, + * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or + * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement, + * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged. + * + * @throws KafkaException for any other unrecoverable errors + */ + @Override + public void commitAsync() { + delegate.commitAsync(); + } + + /** + * Sets the acknowledgement commit callback which can be used to handle acknowledgement completion. + * + * @param callback The acknowledgement commit callback + */ + @Override + public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback) { + delegate.setAcknowledgementCommitCallback(callback); + } + + /** + * Determines the client's unique client instance ID used for telemetry. This ID is unique to + * this specific client instance and will not change after it is initially generated. + * The ID is useful for correlating client operations with telemetry sent to the broker and + * to its eventual monitoring destinations. + * <p> + * If telemetry is enabled, this will first require a connection to the cluster to generate + * the unique client instance ID. This method waits up to {@code timeout} for the consumer + * client to complete the request. + * <p> + * Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG} + * configuration option. + * + * @param timeout The maximum time to wait for consumer client to determine its client instance ID. + * The value must be non-negative. Specifying a timeout of zero means do not + * wait for the initial request to complete if it hasn't already. + * + * @return The client's assigned instance id used for metrics collection. + * + * @throws IllegalArgumentException if the {@code timeout} is negative + * @throws IllegalStateException if telemetry is not enabled + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws InterruptException if the thread is interrupted while blocked + * @throws KafkaException if an unexpected error occurs while trying to determine the client + * instance ID, though this error does not necessarily imply the + * consumer client is otherwise unusable + */ + @Override + public Uuid clientInstanceId(Duration timeout) { + return delegate.clientInstanceId(timeout); + } + + /** + * Get the metrics kept by the consumer + */ + @Override + public Map<MetricName, ? extends Metric> metrics() { + return delegate.metrics(); + } + + /** + * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. + * This will commit acknowledgements if possible within the default timeout. + * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close. + * + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws InterruptException if the thread is interrupted before or while this method is called + * @throws KafkaException for any other error during close + */ + @Override + public void close() { + delegate.close(); + } + + /** + * Tries to close the consumer cleanly within the specified timeout. This method waits up to + * {@code timeout} for the consumer to complete acknowledgements and leave the group. + * If the consumer is unable to complete acknowledgements and gracefully leave the group + * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be + * used to interrupt close. + * + * @param timeout The maximum time to wait for consumer to close gracefully. The value must be + * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. + * + * @throws IllegalArgumentException if the {@code timeout} is negative + * @throws WakeupException if {@link #wakeup()} is called before or while this method is called + * @throws InterruptException if the thread is interrupted before or while this method is called + * @throws KafkaException for any other error during close + */ + @Override + public void close(Duration timeout) { + delegate.close(timeout); + } + + /** + * Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll. + * The thread which is blocking in an operation will throw {@link WakeupException}. + * If no thread is blocking in a method which can throw {@link WakeupException}, + * the next call to such a method will raise it instead. + */ + @Override + public void wakeup() { + delegate.wakeup(); + } + + // Functions below are for testing only + String clientId() { + return delegate.clientId(); + } + + Metrics metricsRegistry() { + return delegate.metricsRegistry(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java new file mode 100644 index 00000000000..212e2ae390d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.LogContext; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; + +/** + * A mock of the {@link ShareConsumer} interface you can use for testing code that uses Kafka. This class is <i> not + * thread-safe </i>. + */ +public class MockShareConsumer<K, V> implements ShareConsumer<K, V> { + + private final SubscriptionState subscriptions; + private final AtomicBoolean wakeup; + + private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records; + + private boolean closed; + private Uuid clientInstanceId; + + public MockShareConsumer() { + this.subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + this.records = new HashMap<>(); + this.closed = false; + this.wakeup = new AtomicBoolean(false); + } + + @Override + public synchronized Set<String> subscription() { + ensureNotClosed(); + return subscriptions.subscription(); + } + + @Override + public synchronized void subscribe(Collection<String> topics) { + ensureNotClosed(); + subscriptions.subscribe(new HashSet<>(topics), Optional.empty()); + } + + @Override + public synchronized void unsubscribe() { + ensureNotClosed(); + subscriptions.unsubscribe(); + } + + @Override + public synchronized ConsumerRecords<K, V> poll(Duration timeout) { + ensureNotClosed(); + + final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>(); + for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) { + final List<ConsumerRecord<K, V>> recs = entry.getValue(); + for (final ConsumerRecord<K, V> rec : recs) { + results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec); + } + } + + records.clear(); + return new ConsumerRecords<>(results); + } + + @Override + public synchronized void acknowledge(ConsumerRecord<K, V> record) { + } + + @Override + public synchronized void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) { + } + + @Override + public synchronized Map<TopicIdPartition, Optional<KafkaException>> commitSync() { + return new HashMap<>(); + } + + @Override + public synchronized Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout) { + return new HashMap<>(); + } + + @Override + public synchronized void commitAsync() { + } + + @Override + public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback) { + } + + public synchronized void setClientInstanceId(final Uuid clientInstanceId) { + this.clientInstanceId = clientInstanceId; + } + + @Override + public synchronized Uuid clientInstanceId(Duration timeout) { + if (clientInstanceId == null) { + throw new UnsupportedOperationException("clientInstanceId not set"); + } + + return clientInstanceId; + } + + @Override + public synchronized Map<MetricName, ? extends Metric> metrics() { + ensureNotClosed(); + return Collections.emptyMap(); + } + + @Override + public synchronized void close() { + close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); + } + + @Override + public synchronized void close(Duration timeout) { + closed = true; + } + + @Override + public synchronized void wakeup() { + wakeup.set(true); + } + + public synchronized void addRecord(ConsumerRecord<K, V> record) { + ensureNotClosed(); + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + if (!subscriptions.subscription().contains(record.topic())) + throw new IllegalStateException("Cannot add records for a topics that is not subscribed by the consumer"); + List<ConsumerRecord<K, V>> recs = records.computeIfAbsent(tp, k -> new ArrayList<>()); + recs.add(record); + } + + private void ensureNotClosed() { + if (closed) + throw new IllegalStateException("This consumer has already been closed."); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java new file mode 100644 index 00000000000..8ac4198c70d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * @see KafkaShareConsumer + * @see MockShareConsumer + */ +@InterfaceStability.Evolving +public interface ShareConsumer<K, V> extends Closeable { + + /** + * @see KafkaShareConsumer#subscription() + */ + Set<String> subscription(); + + /** + * @see KafkaShareConsumer#subscribe(Collection) + */ + void subscribe(Collection<String> topics); + + /** + * @see KafkaShareConsumer#unsubscribe() + */ + void unsubscribe(); + + /** + * @see KafkaShareConsumer#poll(Duration) + */ + ConsumerRecords<K, V> poll(Duration timeout); + + /** + * @see KafkaShareConsumer#acknowledge(ConsumerRecord) + */ + void acknowledge(ConsumerRecord<K, V> record); + + /** + * @see KafkaShareConsumer#acknowledge(ConsumerRecord, AcknowledgeType) + */ + void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type); + + /** + * @see KafkaShareConsumer#commitSync() + */ + Map<TopicIdPartition, Optional<KafkaException>> commitSync(); + + /** + * @see KafkaShareConsumer#commitSync(Duration) + */ + Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout); + + /** + * @see KafkaShareConsumer#commitAsync() + */ + void commitAsync(); + + /** + * @see KafkaShareConsumer#setAcknowledgementCommitCallback(AcknowledgementCommitCallback) + */ + void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback); + + /** + * See {@link KafkaShareConsumer#clientInstanceId(Duration)}} + */ + Uuid clientInstanceId(Duration timeout); + + /** + * @see KafkaShareConsumer#metrics() + */ + Map<MetricName, ? extends Metric> metrics(); + + /** + * @see KafkaShareConsumer#close() + */ + void close(); + + /** + * @see KafkaShareConsumer#close(Duration) + */ + void close(Duration timeout); + + /** + * @see KafkaShareConsumer#wakeup() + */ + void wakeup(); + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegate.java new file mode 100644 index 00000000000..65dd9d2c30f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegate.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.metrics.Metrics; + +/** + * This extension interface provides a handful of methods to expose internals of the {@link ShareConsumer} for + * various tests. + * + * <p/> + * + * <em>Note</em>: this is for internal use only and is not intended for use by end users. Internal users should + * not attempt to determine the underlying implementation to avoid coding to an unstable interface. Rather, it is + * the {@link ShareConsumer} API contract that should serve as the caller's interface. + */ +public interface ShareConsumerDelegate<K, V> extends ShareConsumer<K, V> { + + String clientId(); + + Metrics metricsRegistry(); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java new file mode 100644 index 00000000000..3c41a58e057 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; + +/** + * {@code ShareConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link ShareConsumer} implementation that is created. This provides the means by which + * {@link KafkaShareConsumer} can remain the top-level facade for implementations, but allow different implementations + * to co-exist under the covers. + * + * <p> + * <em>Note</em>: this is for internal use only and is not intended for use by end users. Internal users should + * not attempt to determine the underlying implementation to avoid coding to an unstable interface. Rather, it is + * the {@link ShareConsumer} API contract that should serve as the caller's interface. + */ +public class ShareConsumerDelegateCreator { + public <K, V> ShareConsumerDelegate<K, V> create(final ConsumerConfig config, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer) { + throw new UnsupportedOperationException("Not implemented"); + } + + public <K, V> ShareConsumerDelegate<K, V> create(final LogContext logContext, + final String clientId, + final String groupId, + final ConsumerConfig config, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer, + final Time time, + final KafkaClient client, + final SubscriptionState subscriptions, + final ConsumerMetadata metadata) { + throw new UnsupportedOperationException("Not implemented"); + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java new file mode 100644 index 00000000000..79ba87d4597 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.Iterator; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class MockShareConsumerTest { + private final MockShareConsumer<String, String> consumer = new MockShareConsumer<>(); + + @Test + public void testSimpleMock() { + consumer.subscribe(Collections.singleton("test")); + assertEquals(0, consumer.poll(Duration.ZERO).count()); + ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, + 0, 0, "key1", "value1", new RecordHeaders(), Optional.empty()); + ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, + 0, 0, "key2", "value2", new RecordHeaders(), Optional.empty()); + consumer.addRecord(rec1); + consumer.addRecord(rec2); + ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(1)); + Iterator<ConsumerRecord<String, String>> iter = recs.iterator(); + assertEquals(rec1, iter.next()); + assertEquals(rec2, iter.next()); + assertFalse(iter.hasNext()); + } +} \ No newline at end of file