[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16737912#comment-16737912 ] Vahid Hashemian commented on KAFKA-6774: [~allenxwang] As you indicated the changes are on the client side, so this should work with broker version 1.1. > Improve default groupId behavior in consumer > > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16737714#comment-16737714 ] Allen Wang commented on KAFKA-6774: --- Question on this Jira - does it work with Kafka 1.1 broker? From the PR it looks like so because there is only client side changes. > Improve default groupId behavior in consumer > > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689175#comment-16689175 ] ASF GitHub Bot commented on KAFKA-6774: --- hachikuji closed pull request #5877: KAFKA-6774: Improve the default group id behavior in KafkaConsumer (KIP-289) URL: https://github.com/apache/kafka/pull/5877 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 795a762a494..9cd5766ea3e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -277,7 +277,7 @@ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) -.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) +.define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC) .define(SESSION_TIMEOUT_MS_CONFIG, Type.INT, 1, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3a756721fd8..5c673a58c10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -37,6 +37,8 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; @@ -557,6 +559,7 @@ private final Logger log; private final String clientId; +private String groupId; private final ConsumerCoordinator coordinator; private final Deserializer keyDeserializer; private final Deserializer valueDeserializer; @@ -654,18 +657,23 @@ public KafkaConsumer(Properties properties, } @SuppressWarnings("unchecked") -private KafkaConsumer(ConsumerConfig config, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { +private KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) { try { String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); if (clientId.isEmpty()) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; -String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); - +this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); this.log = logContext.logger(getClass()); +boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); +if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided +if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) +enableAutoCommit = false; +else if (enableAutoCommit) +throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used."); +} else if (groupId.isEmpty()) +log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); log.debug("Initializing the Kafka consumer"); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); @@ -678,8 +686,7 @@ private KafkaConsumer(ConsumerConfig config, .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags);
[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674299#comment-16674299 ] ASF GitHub Bot commented on KAFKA-6774: --- vahidhashemian opened a new pull request #5877: KAFKA-6774: Improve the default group id behavior in KafkaConsumer (KIP-289) URL: https://github.com/apache/kafka/pull/5877 Improve the default group id behavior by * changing the default consumer group to null, where no offset commit or fetch, or group management operations are allowed * deprecate the use of empty (`""`) consumer group on the client ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve default groupId behavior in consumer > > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670328#comment-16670328 ] Vahid Hashemian commented on KAFKA-6774: [~hachikuji], thanks for checking. I'll try to spend time on this later this week. > Improve default groupId behavior in consumer > > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670312#comment-16670312 ] Jason Gustafson commented on KAFKA-6774: [~vahid] Do you have time to work on this or can someone else pick it up? > Improve default groupId behavior in consumer > > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487767#comment-16487767 ] Vahid Hashemian commented on KAFKA-6774: [~hachikuji] Yes that makes sense. I believe this has been noted in the KIP as well. > Improve default groupId behavior in consumer > > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer
[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459266#comment-16459266 ] Jason Gustafson commented on KAFKA-6774: [~vahid] The suggestion here would be to have the standalone consumer skip querying for committed offsets if no groupId is provided. So if the user hasn't done in explicit seek, then we would try to use the auto reset behavior. Does that seem reasonable? > Improve default groupId behavior in consumer > > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)