Hi Suresh, and others,
I've attached the Java code snippets for both the producer and the consumer,
along with the output from the most recent run.
As shown in the producer log, the messages are being sent successfully—they are
assigned to a partition and given an offset as expected. However, the consumer
continues to receive only empty results when polling the topic.
The topic is configured with a single partition, since there is only one
consumer involved in this setup.
I would really appreciate any suggestions or insights into what might be going
wrong here.
Thank you again for your time and support!
Kind regards,
Jakob
________________________________
Fra: Suresh Chidambaram <chida.sur...@gmail.com>
Sendt: 21. juli 2025 13:55
Til: Jakob Molander <j...@geus.dk>
Emne: Re: Kafka Consumer Not Receiving Messages – Request for Help
Hi Jakob,
If it's possible for you to show a demo on the issue, I may get some ideas and
I may help you. Please let me know if you are okay with that.
Thanks
C Suresh
On Mon, Jul 21, 2025 at 4:49 PM Jakob Molander <j...@geus.dk.invalid> wrote:
Hi Suresh, and others,
Thank you for your reply.
To provide some additional context: there is only one producer and one consumer
running, and nothing else is currently interacting with this Kafka instance.
Each time I run my test scenario, I start from a clean slate:
1. Deploy the Kafka instance in Docker
2. Create the "jupiter-events" topic
3. Start the consumer instance
4. Start the producer instance
5. Send a few test messages from the producer
Despite this setup, the consumer still receives an empty list when polling. I’d
appreciate any suggestions or insights on what might be going wrong.
Thanks again for your time and support!
Kind regards,
Jakob
________________________________
Fra: Suresh Chidambaram <chida.sur...@gmail.com<mailto:chida.sur...@gmail.com>>
Sendt: 21. juli 2025 12:06
Til: users@kafka.apache.org<mailto:users@kafka.apache.org>
<users@kafka.apache.org<mailto:users@kafka.apache.org>>
Emne: Re: Kafka Consumer Not Receiving Messages – Request for Help
Hi Jakob,
The consumer is not consuming the messages because the messages might have
already been consumed using the consumer group, jupiter-workers. So, the
options below will help you read the messages.
1. Change the
group.id<https://url41.mailanyone.net/scanner?m=1udp63-000000007Sm-3Q5u&d=4%7Cmail%2F90%2F1753098600%2F1udp63-000000007Sm-3Q5u%7Cin41g%7C57e1b682%7C15209072%7C14343128%7C687E2A6BF4A95B9BA494CE56E4E9A2FA&o=gphto%2F%2Frt%3Adi.pu&s=JHEhn3KFBMCNF4K2oEY0X2mv058>
from jupiter-workers to something else like
jupiter-workers-latest
2. Reset the consumer offset of the consumer group "jupiter-workers" of
Topic "jupiter-events" to the beginning offset, and then start the consumer
application
Please reach out if any more details are needed.
Thanks
C Suresh
+91 9042189457
On Mon, Jul 21, 2025 at 2:24 PM Jakob Molander <j...@geus.dk.invalid> wrote:
> Hello everyone,
>
> I'm currently working with a single Kafka 3.9.1 instance running in
> Docker, and I've set up a single topic named "jupiter-events".
>
> My Kafka producer is successfully sending messages to the topic, and I've
> confirmed that these messages are being received by the Kafka instance.
>
> However, I'm facing an issue where my Kafka consumer does not receive any
> messages when polling the topic. According to the logs, the consumer
> appears to be functioning correctly, but it always receives an empty list
> during polling.
>
> I've attached the configuration files and logs for both the producer and
> the consumer for reference.
>
> Could anyone kindly take a look and let me know if there’s something I
> might be missing?
>
> Thank you in advance for your time and assistance!
>
> Kind regards,
> Jakob
>
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector -
initializing Kafka metrics collector
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.9.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId:
f745dfdcee2b9851
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs:
1753100367183
[main] INFO org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer -
[Consumer clientId=consumer-jupiter-workers-1, groupId=jupiter-workers]
Subscribed to topic(s): jupiter-events
[main] INFO org.apache.kafka.clients.Metadata - [Consumer
clientId=consumer-jupiter-workers-1, groupId=jupiter-workers] Cluster ID:
5L6g3nShT-eMCtK--X86sw
2025-07-21T14:20:48.728+02:00 INFO 1 --- [nio-8080-exec-1]
o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
2025-07-21T14:20:48.755+02:00 INFO 1 --- [nio-8080-exec-1]
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1]
Instantiated an idempotent producer.
2025-07-21T14:20:48.821+02:00 INFO 1 --- [nio-8080-exec-1]
o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.9.1
2025-07-21T14:20:48.823+02:00 INFO 1 --- [nio-8080-exec-1]
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: f745dfdcee2b9851
2025-07-21T14:20:48.823+02:00 INFO 1 --- [nio-8080-exec-1]
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1753100448819
2025-07-21T14:20:49.368+02:00 INFO 1 --- [ad | producer-1]
org.apache.kafka.clients.Metadata : [Producer clientId=producer-1]
Cluster ID: 5L6g3nShT-eMCtK--X86sw
2025-07-21T14:20:49.475+02:00 INFO 1 --- [ad | producer-1]
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1]
ProducerId set to 0 with epoch 0
21-Jul-2025 14:20:49.523 INFO [http-nio-8080-exec-1]
dk.geus.jupiter.api.model.MessageDAO.sendMessage Send Message:
e2873e4a-43ff-4991-8ce4-972c629709fd Topic: jupiter-events
21-Jul-2025 14:20:49.523 INFO [http-nio-8080-exec-1]
dk.geus.jupiter.api.model.MessageDAO.sendMessage Partition: 0 Offset: 0
2025-07-21T14:20:49.524+02:00 INFO 1 --- [nio-8080-exec-1]
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1]
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2025-07-21T14:20:49.534+02:00 INFO 1 --- [nio-8080-exec-1]
o.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2025-07-21T14:20:49.534+02:00 INFO 1 --- [nio-8080-exec-1]
o.apache.kafka.common.metrics.Metrics : Closing reporter
org.apache.kafka.common.metrics.JmxReporter
2025-07-21T14:20:49.534+02:00 INFO 1 --- [nio-8080-exec-1]
o.apache.kafka.common.metrics.Metrics : Closing reporter
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
2025-07-21T14:20:49.534+02:00 INFO 1 --- [nio-8080-exec-1]
o.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2025-07-21T14:20:49.535+02:00 INFO 1 --- [nio-8080-exec-1]
o.a.kafka.common.utils.AppInfoParser : App info kafka.producer for
producer-1 unregistered
// create consumer
KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(properties);
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(Config.KAFKATOPIC));
while (true) {
// poll for new messages
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
LOG.log(Level.INFO, "Received {0} elements", new
Object[]{records.count()});
}
// add messages to queue for processing
for (ConsumerRecord<String, String> record : records) {
try {
QUEUE.put(record);
} catch (Exception | Error ex) {
LOG.log(Level.WARNING, "Could not add to queue", ex);
}
}
}
try (KafkaProducer producer = new KafkaProducer(PROPERTIES)) {
// create a producer record
ProducerRecord<String, String> record = new
ProducerRecord<>(Config.KAFKATOPIC, traceid, GSON.toJson(message));
// send data - asynchronous
Future<RecordMetadata> future = producer.send(record);
// get metadata - synchronous
RecordMetadata metadata = future.get();
LOG.log(Level.INFO, "Send Message: {0} Topic: {1}", new
Object[]{traceid, metadata.topic()});
LOG.log(Level.INFO, "Partition: {0} Offset: {1}", new
Object[]{metadata.partition(), metadata.offset()});
// flush data - synchronous
producer.flush();
result = true;
} catch (Exception | Error ex) {
LOG.log(Level.WARNING, "Something went wrong", ex);
}
ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [kafka-service.jupiter:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.gzip.level = -1
compression.lz4.level = 9
compression.type = none
compression.zstd.level = 3
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = []
key.serializer = class
org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.header.urlencode = false
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class
org.apache.kafka.common.serialization.StringSerializer
ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = earliest
bootstrap.servers = [kafka-service.jupiter:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-jupiter-workers-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
enable.metrics.push = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = jupiter-workers
group.instance.id = null
group.protocol = classic
group.remote.assignor = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor, class
org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.header.urlencode = false
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer