Hi Suresh, and others,

I've attached the Java code snippets for both the producer and the consumer, 
along with the output from the most recent run.

As shown in the producer log, the messages are being sent successfully—they are 
assigned to a partition and given an offset as expected. However, the consumer 
continues to receive only empty results when polling the topic.

The topic is configured with a single partition, since there is only one 
consumer involved in this setup.

I would really appreciate any suggestions or insights into what might be going 
wrong here.

Thank you again for your time and support!

Kind regards,
Jakob


________________________________
Fra: Suresh Chidambaram <chida.sur...@gmail.com>
Sendt: 21. juli 2025 13:55
Til: Jakob Molander <j...@geus.dk>
Emne: Re: Kafka Consumer Not Receiving Messages – Request for Help

Hi Jakob,

If it's possible for you to show a demo on the issue, I may get some ideas and 
I may help you. Please let me know if you are okay with that.

Thanks
C Suresh


On Mon, Jul 21, 2025 at 4:49 PM Jakob Molander <j...@geus.dk.invalid> wrote:
Hi Suresh, and others,

Thank you for your reply.

To provide some additional context: there is only one producer and one consumer 
running, and nothing else is currently interacting with this Kafka instance.

Each time I run my test scenario, I start from a clean slate:

1. Deploy the Kafka instance in Docker
2. Create the "jupiter-events" topic
3. Start the consumer instance
4. Start the producer instance
5. Send a few test messages from the producer

Despite this setup, the consumer still receives an empty list when polling. I’d 
appreciate any suggestions or insights on what might be going wrong.

Thanks again for your time and support!

Kind regards,
Jakob
________________________________
Fra: Suresh Chidambaram <chida.sur...@gmail.com<mailto:chida.sur...@gmail.com>>
Sendt: 21. juli 2025 12:06
Til: users@kafka.apache.org<mailto:users@kafka.apache.org> 
<users@kafka.apache.org<mailto:users@kafka.apache.org>>
Emne: Re: Kafka Consumer Not Receiving Messages – Request for Help

Hi Jakob,

The consumer is not consuming the messages because the messages might have
already been consumed using the consumer group, jupiter-workers. So, the
options below will help you read the messages.

1. Change the 
group.id<https://url41.mailanyone.net/scanner?m=1udp63-000000007Sm-3Q5u&d=4%7Cmail%2F90%2F1753098600%2F1udp63-000000007Sm-3Q5u%7Cin41g%7C57e1b682%7C15209072%7C14343128%7C687E2A6BF4A95B9BA494CE56E4E9A2FA&o=gphto%2F%2Frt%3Adi.pu&s=JHEhn3KFBMCNF4K2oEY0X2mv058>
 from jupiter-workers to something else like
jupiter-workers-latest
2. Reset the consumer offset of the consumer group "jupiter-workers" of
Topic "jupiter-events" to the beginning offset, and then start the consumer
application

Please reach out if any more details are needed.

Thanks
C Suresh
+91 9042189457





On Mon, Jul 21, 2025 at 2:24 PM Jakob Molander <j...@geus.dk.invalid> wrote:

> Hello everyone,
>
> I'm currently working with a single Kafka 3.9.1 instance running in
> Docker, and I've set up a single topic named "jupiter-events".
>
> My Kafka producer is successfully sending messages to the topic, and I've
> confirmed that these messages are being received by the Kafka instance.
>
> However, I'm facing an issue where my Kafka consumer does not receive any
> messages when polling the topic. According to the logs, the consumer
> appears to be functioning correctly, but it always receives an empty list
> during polling.
>
> I've attached the configuration files and logs for both the producer and
> the consumer for reference.
>
> Could anyone kindly take a look and let me know if there’s something I
> might be missing?
>
> Thank you in advance for your time and assistance!
>
> Kind regards,
> Jakob
>
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - 
initializing Kafka metrics collector
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.9.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
f745dfdcee2b9851
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
1753100367183
[main] INFO org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer - 
[Consumer clientId=consumer-jupiter-workers-1, groupId=jupiter-workers] 
Subscribed to topic(s): jupiter-events
[main] INFO org.apache.kafka.clients.Metadata - [Consumer 
clientId=consumer-jupiter-workers-1, groupId=jupiter-workers] Cluster ID: 
5L6g3nShT-eMCtK--X86sw
2025-07-21T14:20:48.728+02:00  INFO 1 --- [nio-8080-exec-1] 
o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2025-07-21T14:20:48.755+02:00  INFO 1 --- [nio-8080-exec-1] 
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] 
Instantiated an idempotent producer.
2025-07-21T14:20:48.821+02:00  INFO 1 --- [nio-8080-exec-1] 
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.9.1
2025-07-21T14:20:48.823+02:00  INFO 1 --- [nio-8080-exec-1] 
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: f745dfdcee2b9851
2025-07-21T14:20:48.823+02:00  INFO 1 --- [nio-8080-exec-1] 
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1753100448819
2025-07-21T14:20:49.368+02:00  INFO 1 --- [ad | producer-1] 
org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] 
Cluster ID: 5L6g3nShT-eMCtK--X86sw
2025-07-21T14:20:49.475+02:00  INFO 1 --- [ad | producer-1] 
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] 
ProducerId set to 0 with epoch 0
21-Jul-2025 14:20:49.523 INFO [http-nio-8080-exec-1] 
dk.geus.jupiter.api.model.MessageDAO.sendMessage Send Message: 
e2873e4a-43ff-4991-8ce4-972c629709fd Topic: jupiter-events
21-Jul-2025 14:20:49.523 INFO [http-nio-8080-exec-1] 
dk.geus.jupiter.api.model.MessageDAO.sendMessage Partition: 0 Offset: 0
2025-07-21T14:20:49.524+02:00  INFO 1 --- [nio-8080-exec-1] 
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] 
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2025-07-21T14:20:49.534+02:00  INFO 1 --- [nio-8080-exec-1] 
o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2025-07-21T14:20:49.534+02:00  INFO 1 --- [nio-8080-exec-1] 
o.apache.kafka.common.metrics.Metrics    : Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
2025-07-21T14:20:49.534+02:00  INFO 1 --- [nio-8080-exec-1] 
o.apache.kafka.common.metrics.Metrics    : Closing reporter 
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
2025-07-21T14:20:49.534+02:00  INFO 1 --- [nio-8080-exec-1] 
o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2025-07-21T14:20:49.535+02:00  INFO 1 --- [nio-8080-exec-1] 
o.a.kafka.common.utils.AppInfoParser     : App info kafka.producer for 
producer-1 unregistered
        // create consumer
        KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(properties);

        // subscribe consumer to our topic(s)
        consumer.subscribe(Arrays.asList(Config.KAFKATOPIC));

        while (true) {
            // poll for new messages
            ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));

            if (!records.isEmpty()) {
                LOG.log(Level.INFO, "Received {0} elements", new 
Object[]{records.count()});
            }
            
            // add messages to queue for processing
            for (ConsumerRecord<String, String> record : records) {
                try {
                    QUEUE.put(record);
                } catch (Exception | Error ex) {
                    LOG.log(Level.WARNING, "Could not add to queue", ex);
                }
            }
        }
        try (KafkaProducer producer = new KafkaProducer(PROPERTIES)) {
            // create a producer record
            ProducerRecord<String, String> record = new 
ProducerRecord<>(Config.KAFKATOPIC, traceid, GSON.toJson(message));

            // send data - asynchronous
            Future<RecordMetadata> future = producer.send(record);

            // get metadata - synchronous
            RecordMetadata metadata = future.get();

            LOG.log(Level.INFO, "Send Message: {0} Topic: {1}", new 
Object[]{traceid, metadata.topic()});
            LOG.log(Level.INFO, "Partition: {0} Offset: {1}", new 
Object[]{metadata.partition(), metadata.offset()});

            // flush data - synchronous
            producer.flush();

            result = true;
        } catch (Exception | Error ex) {
            LOG.log(Level.WARNING, "Something went wrong", ex);
        }
ProducerConfig values:
        acks = -1
        auto.include.jmx.reporter = true
        batch.size = 16384
        bootstrap.servers = [kafka-service.jupiter:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.gzip.level = -1
        compression.lz4.level = 9
        compression.type = none
        compression.zstd.level = 3
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        enable.metrics.push = true
        interceptor.classes = []
        key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metadata.recovery.strategy = none
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.adaptive.partitioning.enable = true
        partitioner.availability.timeout.ms = 0
        partitioner.class = null
        partitioner.ignore.keys = false
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.max.ms = 1000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.header.urlencode = false
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.include.jmx.reporter = true
        auto.offset.reset = earliest
        bootstrap.servers = [kafka-service.jupiter:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-jupiter-workers-1
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        enable.metrics.push = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = jupiter-workers
        group.instance.id = null
        group.protocol = classic
        group.remote.assignor = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metadata.recovery.strategy = none
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor, class 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.max.ms = 1000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.header.urlencode = false
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

Reply via email to