[ https://issues.apache.org/jira/browse/KAFKA-13339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-13339. ------------------------------------- Resolution: Invalid > Kstream not fetch all the messages > ---------------------------------- > > Key: KAFKA-13339 > URL: https://issues.apache.org/jira/browse/KAFKA-13339 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: karthik > Priority: Major > > i used the below Kstream code for fetch all the records from my topic and > facing below error. > > code : > > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > AppConfigs.bootstrapServers1); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.Integer().getClass()); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > StreamsBuilder streamsBuilder = new StreamsBuilder(); > KStream<Integer, String> kStream = > streamsBuilder.stream(AppConfigs.topicName); > kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v)); > //kStream.peek((k,v)-> System.out.println("Key= " + k + " Value= " + v)); > Topology topology = streamsBuilder.build(); > KafkaStreams streams = new KafkaStreams(topology, props); > System.out.println("Starting stream."); > streams.start(); > Runtime.getRuntime().addShutdownHook(new Thread(() -> > { System.out.println("Shutting down stream"); streams.close(); } > )); > > *Error :* > DEBUG o.a.k.clients.FetchSessionHandler - [Consumer > clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, > groupId=MESProducer] Built incremental fetch (sessionId=797093970, > epoch=417) for node 2. Added 0 partition(s), altered 0 partition(s), removed > 0 partition(s) out of 1 partition(s) > 18:22:16.088 [Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1] > DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer > clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, > groupId=Producer] Sending READ_UNCOMMITTED > IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-topic-0)) to > broker > DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer > clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, > groupId=MESProducer] Added READ_UNCOMMITTED fetch request for partition > test-topic-1 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch > > -- This message was sent by Atlassian Jira (v8.3.4#803005)