guozhangwang commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1115047665
########## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ########## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or + * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * <ol> + * <li>Does this method block?</li> + * <li>Does this method interact with the background thread?</li> + * <li>If yes, what data is passed as input to the background thread?</li> + * <li>If yes, what data is returned as output from the background thread?</li> + * </ol> + * + * @param <K> Key + * @param <V> Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> { + + /** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ + private Time time; + + private EventHandler eventHandler; + + private SubscriptionState subscriptions; + + private Deserializer<K> keyDeserializer; + + private Deserializer<V> valueDeserializer; + + private long defaultApiTimeoutMs; + + private List<ConsumerPartitionAssignor> assignors; + + private Optional<String> groupId; + + /** + * Answers to the above questions: + * + * <ol> + * <li>No</li> + * <li>No</li> + * <li><i>n/a</i></li> + * <li><i>n/a</i></li> + * </ol> + */ + @Override + public Set<TopicPartition> assignment() { Review Comment: In the `assign()` -> `assignment()` example above, there is no "rebalance" triggered and hence there's no rebalance listener triggered either. The rebalance listener is only relevant with the `subscribe()` scenarios. So just to summarize all the scenarios here to avoid confusions between us :) 1. `assign()`: the followed `assignment()` should always reflect the latest partition list from the previous `assign()`, and also the followed `poll()` should return data from the latest partition list only (i.e. the background thread, upon getting this new partition list, should cleanup the buffered records accordingly, and the followed `poll` call should return empty data if no new data is retrieved within the poll timeout yet). 2. `subscribe()`: the followed `assignment()` could potentially still return current assignment which is inconsistent with the `subscribe()` --- this is the case today, but as I mentioned above I think could be open for discussion whether we can to change --- or return the new assignment after the rebalance if enough time has elapsed so that the rebalance is done; the followed `poll()` though should also only return data from the latest partition list as a result of the new subscription. 3. `seek/seekToB/E`: the followed `position` should reflect the newly set position, and also the followed `poll` should only return data from the new seeking position. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org