dajac commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380062879
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -108,13 +106,19 @@ import static org.apache.kafka.common.utils.Utils.propsToMap; /** - * This prototype consumer uses an {@link ApplicationEventHandler event handler} to process - * {@link ApplicationEvent application events} so that the network IO can be processed in a dedicated + * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process + * {@link ApplicationEvent application events} so that the network I/O can be processed in a dedicated * {@link ConsumerNetworkThread network thread}. Visit * <a href="https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor">this document</a> - * for detail implementation. + * for implementation detail. + * + * <p/> + * + * <em>Note:</em> this {@link Consumer} implementation is part of the revised consumer group protocol from KIP-848. + * This class should not be invoked directly; users should instead create a {@link KafkaConsumer} as before. + * This consumer implements the new consumer group protocol and is intended to be the default in coming releases. */ -public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { +public class AsyncKafkaConsumer<K, V> implements Consumer<K, V> { Review Comment: I wonder if we should make it package private. Thoughts? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -148,30 +152,38 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); - public PrototypeAsyncConsumer(final Properties properties, - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer) { + public AsyncKafkaConsumer(Map<String, Object> configs) { Review Comment: I really wonder if we need all those constructors now that we have the delegate creator. For instance, would it be possible to only keep the following ones and keep all the conversion to `ConsumerConfig` in the delegate creator? ``` public AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) { this(Time.SYSTEM, config, keyDeserializer, valueDeserializer); } public AsyncKafkaConsumer(final Time time, final ConsumerConfig config, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) { ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * <p/> + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * <p/> + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + + /** + * This is it! This is the core logic. It's extremely rudimentary. + */ + private static boolean useNewConsumer(Map<?, ?> configs) { + Object groupProtocol = configs.get("group.protocol"); Review Comment: nit: Could we use `getString`? ########## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ########## @@ -281,7 +281,7 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map<Object if (!options.hasDryRun()) { for (final TopicPartition p : partitions) { - client.position(p); + long pos = client.position(p); Review Comment: It is strange that we have to change this now. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * <p/> + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * <p/> + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + + /** + * This is it! This is the core logic. It's extremely rudimentary. + */ + private static boolean useNewConsumer(Map<?, ?> configs) { + Object groupProtocol = configs.get("group.protocol"); + + // Takes care of both the null and type checks. + if (!(groupProtocol instanceof String)) + return false; + + return ((String) groupProtocol).equalsIgnoreCase("consumer"); + } + + public <K, V> Consumer<K, V> create(Map<String, Object> configs) { + return createInternal(() -> { + if (useNewConsumer(configs)) { + return new AsyncKafkaConsumer<>(configs); + } else { + return new LegacyKafkaConsumer<>(configs); + } + }); + } + + public <K, V> Consumer<K, V> create(Properties properties) { + return createInternal(() -> { + if (useNewConsumer(properties)) { + return new AsyncKafkaConsumer<>(properties); + } else { + return new LegacyKafkaConsumer<>(properties); + } + }); + } + + public <K, V> Consumer<K, V> create(Properties properties, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer) { + return createInternal(() -> { + if (useNewConsumer(properties)) { + return new AsyncKafkaConsumer<>(properties, keyDeserializer, valueDeserializer); + } else { + return new LegacyKafkaConsumer<>(properties, keyDeserializer, valueDeserializer); + } + }); + } + + public <K, V> Consumer<K, V> create(Map<String, Object> configs, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer) { + return createInternal(() -> { + if (useNewConsumer(configs)) { + return new AsyncKafkaConsumer<>(configs, keyDeserializer, valueDeserializer); + } else { + return new LegacyKafkaConsumer<>(configs, keyDeserializer, valueDeserializer); + } + }); + } + + public <K, V> Consumer<K, V> create(ConsumerConfig config, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer) { + return createInternal(() -> { + if (useNewConsumer(config.values())) { Review Comment: I think that we should simplify the constructors in the delegates. It does not make sense to keep all of them. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java: ########## @@ -211,4 +226,17 @@ else if (t instanceof KafkaException) throw new TimeoutException(e); } } + + public static boolean maybeOverrideEnableAutoCommit(ConsumerConfig config) { Review Comment: nit: Not your fault but I find the name of the method a little weird because it does not override anything. It just returns whether auto commit should be enabled or not. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java: ########## @@ -2582,7 +2581,7 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, return fetchResponse(Collections.singletonMap(partition, fetchInfo)); } - private KafkaConsumer<String, String> newConsumer(Time time, + private LegacyKafkaConsumer<String, String> newConsumer(Time time, KafkaClient client, Review Comment: nit: The indentation of the arguments are not incorrect in a few methods. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * <p/> + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * <p/> + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + + /** + * This is it! This is the core logic. It's extremely rudimentary. + */ + private static boolean useNewConsumer(Map<?, ?> configs) { + Object groupProtocol = configs.get("group.protocol"); + + // Takes care of both the null and type checks. + if (!(groupProtocol instanceof String)) + return false; + + return ((String) groupProtocol).equalsIgnoreCase("consumer"); Review Comment: We have an enum for this. It would be better to reuse it here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * <p/> + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * <p/> + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + + /** + * This is it! This is the core logic. It's extremely rudimentary. + */ + private static boolean useNewConsumer(Map<?, ?> configs) { + Object groupProtocol = configs.get("group.protocol"); + + // Takes care of both the null and type checks. + if (!(groupProtocol instanceof String)) + return false; Review Comment: Could it be `null` based on the config definition? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ########## @@ -0,0 +1,2623 @@ +/* Review Comment: For my understanding, this is basically a copy of the old KafkaConsumer without any changes, right? ########## tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java: ########## @@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws Throwable { () -> log.info("offsetsForTime = {}", offsetsForTime.result)); // Whether or not offsetsForTimes works, beginningOffsets and endOffsets // should work. - consumer.beginningOffsets(timestampsToSearch.keySet()); - consumer.endOffsets(timestampsToSearch.keySet()); + Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(timestampsToSearch.keySet()); + Map<TopicPartition, Long> endingOffsets = consumer.endOffsets(timestampsToSearch.keySet()); + log.trace("beginningOffsets: {}, endingOffsets: {}", beginningOffsets, endingOffsets); Review Comment: This is weird. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java: ########## @@ -164,7 +163,7 @@ * Note to future authors in this class. If you close the consumer, close with DURATION.ZERO to reduce the duration of * the test. */ -public class KafkaConsumerTest { +public class LegacyKafkaConsumerTest { Review Comment: For my understanding, do we have similar tests for the new consumer? Or do we need to run a subset of those for it? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * <p/> + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * <p/> + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + + /** + * This is it! This is the core logic. It's extremely rudimentary. + */ + private static boolean useNewConsumer(Map<?, ?> configs) { Review Comment: I made a similar comment earlier. Would it make sense to create `ConsumerConfig`, validate it and determine whether we should use the old or the new consumer? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -278,22 +283,22 @@ public PrototypeAsyncConsumer(final Time time, } } - public PrototypeAsyncConsumer(LogContext logContext, - String clientId, - Deserializers<K, V> deserializers, - FetchBuffer fetchBuffer, - FetchCollector<K, V> fetchCollector, - ConsumerInterceptors<K, V> interceptors, - Time time, - ApplicationEventHandler applicationEventHandler, - BlockingQueue<BackgroundEvent> backgroundEventQueue, - Metrics metrics, - SubscriptionState subscriptions, - ConsumerMetadata metadata, - long retryBackoffMs, - int defaultApiTimeoutMs, - List<ConsumerPartitionAssignor> assignors, - String groupId) { + public AsyncKafkaConsumer(LogContext logContext, Review Comment: +1 for making it package private. ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig { */ static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; - /** - * <code>internal.throw.on.fetch.stable.offset.unsupported</code> - * Whether or not the consumer should throw when the new stable offset feature is supported. - * If set to <code>true</code> then the client shall crash upon hitting it. - * The purpose of this flag is to prevent unexpected broker downgrade which makes - * the offset fetch protection against pending commit invalid. The safest approach - * is to fail fast to avoid introducing correctness issue. - * - * <p> - * Note: this is an internal configuration and could be changed in the future in a backward incompatible way - * - */ - static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported"; Review Comment: I am not fully convinced by moving this one to ConsumerUtils. It seems to me that we are putting it a bit "no where". I also noticed that we have `LEAVE_GROUP_ON_CLOSE_CONFIG` which is also an internal one. Do you know why we don't have an issue with this one? In the end, I wonder if we should rather create a `InternalConsumerConfig` class for those internal configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org