Hi,
I'm trying to write a very basic Kafka streams consumer in Java.
Once I add a KTable, I see a message in the server log that I have been
unsubscribed from all topics.
Doing the same with a KStream instead of KTable works fine for me.
I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on raspbian OS.
I tried modifying the group.initial.rebalance.delay.ms in the server properties
but this did not help.
The message I get in the server log is:
[2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with
unknown member id joins group streams-wiki-created-table in Empty state.
Created a new member id
streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
and request the member to rejoin with this id.
(kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to rebalance
group streams-wiki-created-table in state PreparingRebalance with old
generation 2 (__consumer_offsets-16) (reason: Adding new member
streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
with group instance id None; client reason: rebalance failed due to 'The group
member needs to have a valid member id before actually entering a consumer
group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,089] INFO [GroupCoordinator 0]: Stabilized group
streams-wiki-created-table generation 3 (__consumer_offsets-16) with 1 members
(kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,458] INFO [GroupCoordinator 0]: Assignment received from
leader
streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
for group streams-wiki-created-table for generation 3. The group has 1
members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,955] INFO [GroupCoordinator 0]: Preparing to rebalance
group streams-wiki-created-table in state PreparingRebalance with old
generation 3 (__consumer_offsets-16) (reason: Removing member
streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
on LeaveGroup; client reason: the consumer unsubscribed from all topics)
(kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,960] INFO [GroupCoordinator 0]: Group
streams-wiki-created-table with generation 4 is now empty
(__consumer_offsets-16) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,998] INFO [GroupCoordinator 0]: Member
MemberMetadata(memberId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e,
groupInstanceId=None,
clientId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer,
clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000,
supportedProtocols=List(stream)) has left group streams-wiki-created-table
through explicit `LeaveGroup`; client reason: the consumer unsubscribed from
all topics (kafka.coordinator.group.GroupCoordinator)
My code is as following:
properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streams-wiki-created-table");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
TimeWindows monthWindow = TimeWindows.of(Duration.ofDays(30));
TimeWindows weekWindow = TimeWindows.of(Duration.ofDays(7));
TimeWindows dayWindow = TimeWindows.of(Duration.ofDays(1));
TimeWindows hourWindow = TimeWindows.of(Duration.ofHours(1));
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Long> createdPagesUserTypeTable =
builder.stream("temp-create-stream", Consumed.with(Serdes.String(),
WikiEventSerdes.WikiEvent()))
.selectKey((ignored, value) ->
value.getUserType()).groupByKey().count();
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new
Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
Can someone please help me figure out what's wrong here?
Thanks,
Meir