guozhangwang commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115047665


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background 
thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid 
clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   In the `assign()` -> `assignment()` example above, there is no "rebalance" 
triggered and hence there's no rebalance listener triggered either. The 
rebalance listener is only relevant with the `subscribe()` scenarios.
   
   So just to summarize all the scenarios here to avoid confusions between us :)
   
   1. `assign()`: the followed `assignment()`  should always reflect the latest 
partition list from the previous `assign()`, and also the followed `poll()` 
should return data from the latest partition list only (i.e. the background 
thread, upon getting this new partition list, should cleanup the buffered 
records accordingly, and the followed `poll` call should return empty data if 
no new data is retrieved within the poll timeout yet).
   2. `subscribe()`: the followed `assignment()` could potentially still return 
current assignment which is inconsistent with the `subscribe()` --- this is the 
case today, but as I mentioned above I think could be open for discussion 
whether we can to change  --- or return the new assignment after the rebalance 
if enough time has elapsed so that the rebalance is done; the followed `poll()` 
though should also only return data from the latest partition list as a result 
of the new subscription.
   3. `seek/seekToB/E`: the followed `position` should reflect the newly set 
position, and also the followed `poll` should only return data from the new 
seeking position.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to