[ 
https://issues.apache.org/jira/browse/KAFKA-12759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-12759:
------------------------------
    Component/s: clients
                 consumer

> Kafka consumers with static group membership won't consume from newly 
> subscribed topics
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12759
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12759
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 2.8.0
>            Reporter: Andrey Polyakov
>            Priority: Minor
>
> We've recently started using static group membership and noticed that when 
> adding a new topic to the subscription, it's not consumed from, regardless of 
> how long the consumer is left to run. A workaround we have is shutting down 
> all consumers in the group for longer than session.timeout.ms, then starting 
> them back up. Is this expected behaviour or a bug?
> Sample application:
> {code:java}
> import java.time.Duration;
> import java.util.Arrays;
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.ConsumerRecords;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Test {
>   static volatile boolean shutdown = false;
>   static final Object shutdownLock = new Object();
>   public static void main(String[] args) {
>     Runtime.getRuntime()
>         .addShutdownHook(
>             new Thread(
>                 () -> {
>                   shutdown = true;
>                   synchronized (shutdownLock) {
>                     try {
>                       shutdownLock.wait();
>                     } catch (InterruptedException e) {
>                       e.printStackTrace();
>                     }
>                   }
>                 }));
>     Properties props = new Properties();
>     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>     props.put(
>         ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>         ByteArrayDeserializer.class.getCanonicalName());
>     props.put(
>         ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>         ByteArrayDeserializer.class.getCanonicalName());
>     props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
>     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); // 5 min
>     props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "instance1");
>     KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
>     consumer.subscribe(Arrays.asList("topic1"));
>     // consumer.subscribe(Arrays.asList("topic1", "topic2"));
>     while (!shutdown) {
>       ConsumerRecords<byte[], byte[]> records = 
> consumer.poll(Duration.ofSeconds(5));
>       System.out.println("poll() returned " + records.count() + " records");
>     }
>     System.out.println("Closing consumer");
>     consumer.close();
>     synchronized (shutdownLock) {
>       shutdownLock.notifyAll();
>       System.out.println("Done closing consumer");
>     }
>   }
> }
> {code}
> Steps to reproduce:
>  0. update bootstrap server config in example code
>  1. run above application, which consumes from topic1
>  2. send SIGTERM to process, cleaning closing the consumer
>  3. modify code to consume from topic1 AND topic2
>  4. run application again, and see that both topics appear in the logs as 
> being part of the subscription, but they're never assigned, regardless of how 
> long you let the consumer run.
> Logs from first run (1 topic subscription):
> {code:java}
> ConsumerConfig values: 
>       allow.auto.create.topics = true
>       auto.commit.interval.ms = 5000
>       auto.offset.reset = latest
>       bootstrap.servers = [localhost:9092]
>       check.crcs = true
>       client.dns.lookup = use_all_dns_ips
>       client.id = consumer-myGroupID-instance1
>       client.rack = 
>       connections.max.idle.ms = 540000
>       default.api.timeout.ms = 60000
>       enable.auto.commit = true
>       exclude.internal.topics = true
>       fetch.max.bytes = 52428800
>       fetch.max.wait.ms = 500
>       fetch.min.bytes = 1
>       group.id = myGroupID
>       group.instance.id = instance1
>       heartbeat.interval.ms = 3000
>       interceptor.classes = []
>       internal.leave.group.on.close = true
>       internal.throw.on.fetch.stable.offset.unsupported = false
>       isolation.level = read_uncommitted
>       key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>       max.partition.fetch.bytes = 1048576
>       max.poll.interval.ms = 300000
>       max.poll.records = 500
>       metadata.max.age.ms = 300000
>       metric.reporters = []
>       metrics.num.samples = 2
>       metrics.recording.level = INFO
>       metrics.sample.window.ms = 30000
>       partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>       receive.buffer.bytes = 65536
>       reconnect.backoff.max.ms = 1000
>       reconnect.backoff.ms = 50
>       request.timeout.ms = 30000
>       retry.backoff.ms = 100
>       sasl.client.callback.handler.class = null
>       sasl.jaas.config = null
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       sasl.kerberos.min.time.before.relogin = 60000
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       sasl.login.callback.handler.class = null
>       sasl.login.class = null
>       sasl.login.refresh.buffer.seconds = 300
>       sasl.login.refresh.min.period.seconds = 60
>       sasl.login.refresh.window.factor = 0.8
>       sasl.login.refresh.window.jitter = 0.05
>       sasl.mechanism = GSSAPI
>       security.protocol = PLAINTEXT
>       security.providers = null
>       send.buffer.bytes = 131072
>       session.timeout.ms = 300000
>       socket.connection.setup.timeout.max.ms = 30000
>       socket.connection.setup.timeout.ms = 10000
>       ssl.cipher.suites = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>       ssl.endpoint.identification.algorithm = https
>       ssl.engine.factory.class = null
>       ssl.key.password = null
>       ssl.keymanager.algorithm = SunX509
>       ssl.keystore.certificate.chain = null
>       ssl.keystore.key = null
>       ssl.keystore.location = null
>       ssl.keystore.password = null
>       ssl.keystore.type = JKS
>       ssl.protocol = TLSv1.3
>       ssl.provider = null
>       ssl.secure.random.implementation = null
>       ssl.trustmanager.algorithm = PKIX
>       ssl.truststore.certificates = null
>       ssl.truststore.location = null
>       ssl.truststore.password = null
>       ssl.truststore.type = JKS
>       value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> Kafka version: 2.8.0
> Kafka commitId: ebb1d6e21cc92130
> Kafka startTimeMs: 1620342287841
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Subscribed to topic(s): topic1
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 
> 2147483646 rack: null)
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] (Re-)joining group
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Successfully joined group with generation 
> Generation{generationId=1, 
> memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Finished assignment for group at generation 1: 
> {instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03=Assignment(partitions=[topic1-0])}
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Successfully synced group in generation 
> Generation{generationId=1, 
> memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Notifying assignor about the new 
> Assignment(partitions=[topic1-0])
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Adding newly assigned partitions: topic1-0
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Found no committed offset for partition topic1-0
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Resetting offset for partition topic1-0 to position 
> FetchPosition{offset=3, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: 
> null)], epoch=0}}.
> poll() returned 0 records
> poll() returned 0 records
> poll() returned 0 records
> poll() returned 0 records
> poll() returned 0 records
> Closing consumer
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Revoke previously assigned partitions topic1-0
> Metrics scheduler closed
> Closing reporter org.apache.kafka.common.metrics.JmxReporter
> Metrics reporters closed
> App info kafka.consumer for consumer-myGroupID-instance1 unregistered
> Done closing consumer
> Process finished with exit code 130 (interrupted by signal 2: SIGINT)
> {code}
> Logs from second run (2 topic subscription):
> {code:java}
> ConsumerConfig values: 
>       allow.auto.create.topics = true
>       auto.commit.interval.ms = 5000
>       auto.offset.reset = latest
>       bootstrap.servers = [localhost:9092]
>       check.crcs = true
>       client.dns.lookup = use_all_dns_ips
>       client.id = consumer-myGroupID-instance1
>       client.rack = 
>       connections.max.idle.ms = 540000
>       default.api.timeout.ms = 60000
>       enable.auto.commit = true
>       exclude.internal.topics = true
>       fetch.max.bytes = 52428800
>       fetch.max.wait.ms = 500
>       fetch.min.bytes = 1
>       group.id = myGroupID
>       group.instance.id = instance1
>       heartbeat.interval.ms = 3000
>       interceptor.classes = []
>       internal.leave.group.on.close = true
>       internal.throw.on.fetch.stable.offset.unsupported = false
>       isolation.level = read_uncommitted
>       key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>       max.partition.fetch.bytes = 1048576
>       max.poll.interval.ms = 300000
>       max.poll.records = 500
>       metadata.max.age.ms = 300000
>       metric.reporters = []
>       metrics.num.samples = 2
>       metrics.recording.level = INFO
>       metrics.sample.window.ms = 30000
>       partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>       receive.buffer.bytes = 65536
>       reconnect.backoff.max.ms = 1000
>       reconnect.backoff.ms = 50
>       request.timeout.ms = 30000
>       retry.backoff.ms = 100
>       sasl.client.callback.handler.class = null
>       sasl.jaas.config = null
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       sasl.kerberos.min.time.before.relogin = 60000
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       sasl.login.callback.handler.class = null
>       sasl.login.class = null
>       sasl.login.refresh.buffer.seconds = 300
>       sasl.login.refresh.min.period.seconds = 60
>       sasl.login.refresh.window.factor = 0.8
>       sasl.login.refresh.window.jitter = 0.05
>       sasl.mechanism = GSSAPI
>       security.protocol = PLAINTEXT
>       security.providers = null
>       send.buffer.bytes = 131072
>       session.timeout.ms = 300000
>       socket.connection.setup.timeout.max.ms = 30000
>       socket.connection.setup.timeout.ms = 10000
>       ssl.cipher.suites = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>       ssl.endpoint.identification.algorithm = https
>       ssl.engine.factory.class = null
>       ssl.key.password = null
>       ssl.keymanager.algorithm = SunX509
>       ssl.keystore.certificate.chain = null
>       ssl.keystore.key = null
>       ssl.keystore.location = null
>       ssl.keystore.password = null
>       ssl.keystore.type = JKS
>       ssl.protocol = TLSv1.3
>       ssl.provider = null
>       ssl.secure.random.implementation = null
>       ssl.trustmanager.algorithm = PKIX
>       ssl.truststore.certificates = null
>       ssl.truststore.location = null
>       ssl.truststore.password = null
>       ssl.truststore.type = JKS
>       value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> Kafka version: 2.8.0
> Kafka commitId: ebb1d6e21cc92130
> Kafka startTimeMs: 1620342351702
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Subscribed to topic(s): topic1, topic2
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 
> 2147483646 rack: null)
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] (Re-)joining group
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Successfully joined group with generation 
> Generation{generationId=1, 
> memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Successfully synced group in generation 
> Generation{generationId=1, 
> memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Notifying assignor about the new 
> Assignment(partitions=[topic1-0])
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Adding newly assigned partitions: topic1-0
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Setting offset for partition topic1-0 to the committed 
> offset FetchPosition{offset=3, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: 
> null)], epoch=0}}
> poll() returned 0 records
> poll() returned 0 records
> poll() returned 0 records
> poll() returned 0 records
> poll() returned 0 records
> Closing consumer
> [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
> groupId=myGroupID] Revoke previously assigned partitions topic1-0
> Metrics scheduler closed
> Closing reporter org.apache.kafka.common.metrics.JmxReporter
> Metrics reporters closed
> App info kafka.consumer for consumer-myGroupID-instance1 unregistered
> Done closing consumer
> Process finished with exit code 130 (interrupted by signal 2: SIGINT)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to