Fwd: Consumer slowness issue
-- Forwarded message -- From: "pravin kumar" Date: 27-Mar-2018 6:11 PM Subject: Consumer slowness issue To: Cc: i have two topics with 5 partitions each wikifeedInputT10 KafkaProducer produces 10 elements and wikifeedInputT10 have received these elements. [admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic wikifeedInputT10 --time -1 wikifeedInputT10:2:2 wikifeedInputT10:4:2 wikifeedInputT10:1:2 wikifeedInputT10:3:2 wikifeedInputT10:0:2 but after processing reading from my outputTopic: wikifeedOutputT15 i have received [admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic wikifeedOutputT15 --time -1 wikifeedOutputT15:2:1 wikifeedOutputT15:4:1 wikifeedOutputT15:1:3 wikifeedOutputT15:3:3 wikifeedOutputT15:0:3 I have received the output in my console as [2018-03-27 17:55:32,359] INFO Kafka version : 1.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2018-03-27 17:55:32,359] INFO Kafka commitId : c0518aa65f25317e (org.apache.kafka.common.utils.AppInfoParser) [2018-03-27 17:55:32,600] INFO [Consumer clientId=C2, groupId=ConsumerWikiFeedLambda4] Discovered group coordinator nms-181.nmsworks.co.in:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-27 17:55:32,602] INFO [Consumer clientId=C2, groupId=ConsumerWikiFeedLambda4] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals. ConsumerCoordinator) [2018-03-27 17:55:32,602] INFO [Consumer clientId=C2, groupId=ConsumerWikiFeedLambda4] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-27 17:55:32,610] INFO [Consumer clientId=C2, groupId=ConsumerWikiFeedLambda4] Successfully joined group with generation 5 (org.apache.kafka.clients.consumer.internals. AbstractCoordinator) [2018-03-27 17:55:32,611] INFO [Consumer clientId=C2, groupId=ConsumerWikiFeedLambda4] Setting newly assigned partitions [wikifeedOutputT15-2, wikifeedOutputT15-1, wikifeedOutputT15-0, wikifeedOutputT15-4, wikifeedOutputT15-3] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) Topic :::wikifeedOutputT15 Partition:0 Keyjoe = Value:: 18263 Topic :::wikifeedOutputT15 Partition:0 Keyphil = Value:: 18230 Topic :::wikifeedOutputT15 Partition:0 Keytania = Value:: 18344 Topic :::wikifeedOutputT15 Partition:1 Keypravin = Value:: 18140 Topic :::wikifeedOutputT15 Partition:1 Keykumar = Value:: 18248 Topic :::wikifeedOutputT15 Partition:1 Keyjoseph = Value:: 18116 Topic :::wikifeedOutputT15 Partition:2 Keylauren = Value:: 18150 Topic :::wikifeedOutputT15 Partition:3 Keybob = Value:: 18131 Topic :::wikifeedOutputT15 Partition:3 Keyerica = Value:: 18084 Topic :::wikifeedOutputT15 Partition:3 Keydamian = Value:: 18126 Topic :::wikifeedOutputT15 Partition:4 Keysam = Value:: 18168 it stops here and im not getting any msgs i have attached my code below package kafka.examples.wikifeed; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties; import java.util.Random; import java.util.stream.IntStream; /** * Created by PravinKumar on 29/7/17. */ public class WikifeedLambdaexample { final static String WIKIFEED_INPUT="wikifeedInputT10"; final static String WIKIFEED_OUTPUT="wikifeedOutputT15"; final static String WIKIFEED_LAMBDA="WikiFeedLambdaT10"; //final static String SUM_LAMBDA="sumlambda10"; final static String BOOTSTRAP_SERVER="localhost:9092"; final static String COUNT_STORE="countstoreT10"; final static String STAT_DIR="/home/admin/Document/kafka_2.11.1.0.1/kafka-streams"; //final static String SUM_OUTPUT_EVEN_TOPIC = "sumoutputeventopicT10"; // final static String EVEN_TABLE = "sumDemo10"; public static void main(String[] args) { ProducerInput(); KafkaStreams WikifeedKStreams= getWikifeed(); WikifeedKStreams.cleanUp(); WikifeedKStreams.start();
Consumer slowness issue
ms.cleanUp(); sumStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(sumStreams::close));*/ } public static void ProducerInput(){ String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam", "lauren", "joseph"}; Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass()); KafkaProducer producer=new KafkaProducer(properties); Random random=new Random(); IntStream.range(0,10) .mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content")) .forEach(record -> producer.send(new ProducerRecord(WikifeedLambdaexample.WIKIFEED_INPUT,null,record))); producer.flush(); } public static KafkaStreams getWikifeed(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class); properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); //properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream inputStream=builder.stream(WIKIFEED_INPUT); KTable kTable=inputStream .filter((key, value) -> value.isNew()) .map(((key, value) -> KeyValue.pair(value.getName(),value))) .groupByKey() .count(Materialized.as(COUNT_STORE)); kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams= new KafkaStreams(builder.build(),properties); return streams; } /*public static KafkaStreams getEvenNumSum(){ Properties props=new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, SUM_LAMBDA); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); props.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream suminput=builder.stream(WIKIFEED_OUTPUT); getKTableForEvenNums(suminput).toStream().to(SUM_OUTPUT_EVEN_TOPIC); KafkaStreams kafkaStreams=new KafkaStreams(builder.build(),props); return kafkaStreams; }*/ /* private static KTable getKTableForEvenNums(KStream sumeveninput){ KTable evenKTable=sumeveninput .filter((key,value)-> value%2 ==0) .groupByKey() .reduce((v1, v2)-> v1 + v2,Materialized.as(EVEN_TABLE)); return evenKTable; }*/ } package kafka.examples.wikifeed; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; /** * Created by PravinKumar on 29/7/17. */ public class wikifeedDriverExample { final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda4"; public static void main(String[] args) { ConsumerOutput(); } public static void ConsumerOutput() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, WikifeedLambdaexample.BOOTSTRAP_SERVER); properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3"); KafkaConsumer consumer = new KafkaConsumer(properties, new StringDeserializer(), new LongDeseri
Producing more number of Records than expected
I have run wikifeed example. i have three topics: wikifeedInputtopicDemo2-10 partitions wikifeedOutputtopicDemo2-10 partitions sumoutputeventopicDemo2-5 partitions i have produced 10 records.but in the inputTopic(wikifeedInputtopicDemo2) it receives more than 10 records. can someone explain how this happens?? [admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1 wikifeedInputtopicDemo2:8:13400 wikifeedInputtopicDemo2:2:13401 wikifeedInputtopicDemo2:5:13400 wikifeedInputtopicDemo2:4:13400 wikifeedInputtopicDemo2:7:13399 wikifeedInputtopicDemo2:1:13399 wikifeedInputtopicDemo2:9:13400 wikifeedInputtopicDemo2:3:13400 wikifeedInputtopicDemo2:6:13400 wikifeedInputtopicDemo2:0:13400 here is my processorTopology code: // public static KafkaStreams getWikifeed(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class); properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); //properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream inputStream=builder.stream(WIKIFEED_INPUT); KTable kTable=inputStream .filter((key, value) -> value.isNew()) .map(((key, value) -> KeyValue.pair(value.getName(),value))) .groupByKey() .count(Materialized.as(COUNT_STORE)); kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams= new KafkaStreams(builder.build(),properties); return streams; } -> My driver code is in the attachment file. package kafka.examples.wikifeed; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.stream.IntStream; /** * Created by PravinKumar on 29/7/17. */ public class wikifeedDriverExample { final static String BOOTSTRAP_SERVERS="localhost:9092"; final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1"; public static void main(String[] args) { ProducerInput(); ConsumerOutput(); } public static void ProducerInput(){ String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam", "lauren", "joseph"}; Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass()); KafkaProducer producer=new KafkaProducer(properties); Random random=new Random(); IntStream.range(0,random.nextInt(10)) .mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content")) .forEach(record -> producer.send(new ProducerRecord(WikifeedLambdaexample.WIKIFEED_INPUT,null,record))); producer.flush(); } public static void ConsumerOutput() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3"); KafkaConsumer consumer = new KafkaConsumer(properties, new StringDeserializer(), new LongDeserializer()); consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_O
Producing more number of Records than expected
I have run wikifeed example. i have three topics: wikifeedInputtopicDemo2-10 partitions wikifeedOutputtopicDemo2-10 partitions sumoutputeventopicDemo2-5 partitions i have produced 10 records.but in the inputTopic(wikifeedInputtopicDemo2) it receives more than 10 records. can someone explain how this happens?? [admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1 wikifeedInputtopicDemo2:8:13400 wikifeedInputtopicDemo2:2:13401 wikifeedInputtopicDemo2:5:13400 wikifeedInputtopicDemo2:4:13400 wikifeedInputtopicDemo2:7:13399 wikifeedInputtopicDemo2:1:13399 wikifeedInputtopicDemo2:9:13400 wikifeedInputtopicDemo2:3:13400 wikifeedInputtopicDemo2:6:13400 wikifeedInputtopicDemo2:0:13400 here is my processorTopology code: // public static KafkaStreams getWikifeed(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class); properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); //properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream inputStream=builder.stream(WIKIFEED_INPUT); KTable kTable=inputStream .filter((key, value) -> value.isNew()) .map(((key, value) -> KeyValue.pair(value.getName(),value))) .groupByKey() .count(Materialized.as(COUNT_STORE)); kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams= new KafkaStreams(builder.build(),properties); return streams; } -> My driver code is in the attachment file.
Tasks across MultipleJVM
{ final static String BOOTSTRAP_SERVERS="localhost:9092"; final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1"; public static void main(String[] args) { ProducerInput(); ConsumerOutput(); } public static void ProducerInput(){ String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam", "lauren", "joseph"}; Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass()); KafkaProducer producer=new KafkaProducer(properties); Random random=new Random(); IntStream.range(0,random.nextInt(10)) .mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content")) .forEach(record -> producer.send(new ProducerRecord(WikifeedLambdaexample.WIKIFEED_INPUT,null,record))); producer.flush(); } public static void ConsumerOutput() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3"); KafkaConsumer consumer = new KafkaConsumer(properties, new StringDeserializer(), new LongDeserializer()); consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT)); while (true) { consumer.poll(100) .forEach((ConsumerRecord consumerRecord) -> System.out.println("Topic :::" +consumerRecord.topic() + " " + "Partition:" + consumerRecord.partition()+ " " + "Key" +consumerRecord.key()+ " " + " = " + " Value:: " +consumerRecord.value())); } } } package kafka.examples.wikifeed; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties; /** * Created by PravinKumar on 29/7/17. */ public class WikifeedLambdaexample { final static String WIKIFEED_INPUT="wikifeedInputtopic1"; final static String WIKIFEED_OUTPUT="wikifeedOutputtopic1"; final static String WIKIFEED_LAMBDA="WikiFeedLambdaexampleC2"; final static String BOOTSTRAP_SERVER="localhost:9092"; final static String COUNT_STORE="countstore1"; final static String STAT_DIR="/home/admin/Desktop/kafka_2.12.1.0.0/kafka-streams"; final static String SUM_OUTPUT_EVEN_TOPIC = "sumoutputeventopicC1"; final static String EVEN_TABLE = "sumy1"; public static void main(String[] args) { KafkaStreams WikifeedKStreams= getWikifeed(); WikifeedKStreams.cleanUp(); WikifeedKStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(WikifeedKStreams::close)); KafkaStreams sumStreams= getEvenNumSum(); sumStreams.cleanUp(); sumStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(sumStreams::close)); } public static KafkaStreams getWikifeed(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class); properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream inputStream=builder.stream(WIKIFEED_INPUT); KTable kTa
ProcessorTopology
Can we give the ouput of one processorTopology as the input to another processorTopology. if it is possible,how can we do it. Can anyone provide it with any example
usage of depricated method in kafka 2_12.1.0.0
i have tried wikifeed example with Kafka 2_12.1.0.0.the count method is now depricated , previously in kafka_2.11-0.10.2.1 i have given count(localStateStoreName). how to give the statestore name in Kafka 2_12.1.0.0. i have attached the code below, package kafka.examples.wikifeed; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties; /** * Created by PravinKumar on 29/7/17. */ public class WikifeedLambdaexample { final static String WIKIFEED_INPUT="wikifeedInput"; final static String WIKIFEED_OUTPUT="wikifeedOutput"; final static String WIKIFEED_LAMBDA="WikiFeedLambda"; final static String BOOTSTRAP_SERVERS="localhost:9092"; final static String COUNT_STORE="countstore"; final static String STAT_DIR="/home/admin/Documents/kafka_2.12.1.0.0/kafka-streams"; public static void main(String[] args) { KafkaStreams kafkaStreams=getWikifeedStreams(); kafkaStreams.cleanUp(); kafkaStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); } public static KafkaStreams getWikifeedStreams(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class); properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream inputStream=builder.stream(WIKIFEED_INPUT); KTable kTable=inputStream .filter((key, value) -> value.isNew()) .map(((key, value) -> KeyValue.pair(value.getName(),value))) .groupByKey() .count(COUNT_STORE); kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams= new KafkaStreams(builder.build(),properties); return streams; } }
Doubts about multiple instance in kafka
I have the Kafka confluent Document. But i cant understand the following line. "It is important to understand that Kafka Streams is not a resource manager, but a library that “runs” anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library <https://docs.confluent.io/current/streams/architecture.html#streams-architecture-threads> to those running application instances" i have tried to run on same machine with multiple JVM with multiple consumers. is it correct way to run on same machine using multiple consumers?? or is there any other way?? i have attached the code below package kafka.examples.MultiConsumerMultipartition.taskConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.stream.IntStream; /** * Created by PravinKumar on 23/10/17. */ public class MultiPartitionMultiConsumerDriver { public static final String CONSUMER_GROUP_ID = "multipartitionmulticonsumerdriver2"; private static final int MAX_RECORDS=1; public static void main(String[] args) throws InterruptedException { produceInput(); consumerOutput(); } public static Properties getConsumerProps() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MultiPartitionMultiConsumerUsingStream.BOOTSTRAP_SERVER); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass().getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Long().deserializer().getClass().getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3"); return properties; } public static void produceInput(){ Random random=new Random(); String[] msg={"hi","my","name","is","pravin","kumar","studied","in","madras","institute","of","technology" ,"hi","my","name","is","pravin","kumar","studied","in","good","shepherd","school","properties","put" ,"ConsumerConfig","BOOTSTRAP","SERVERS","CONFIG","Single","Partition","MultiConsumer","UsingStream" , "BOOTSTRAP","SERVER","properties","put","StreamsConfig","DEFAULT","KEY","SERDE","CLASS","CONFIG" ,"Serdes","String","getClass","getName"}; Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MultiPartitionMultiConsumerUsingStream.BOOTSTRAP_SERVER); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serdes.String().serializer().getClass().getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serdes.String().serializer().getClass().getName()); KafkaProducer producer=new KafkaProducer(producerProps); IntStream.range(0,MAX_RECORDS) .forEach(record ->producer.send(new ProducerRecord (MultiPartitionMultiConsumerUsingStream.INPUT_TOPIC,null,msg[random.nextInt(msg.length)])));//msg[random.nextInt(msg.length)] producer.flush(); } public static void consumerOutput() throws InterruptedException { Properties consumerProps = getConsumerProps(); KafkaConsumer consumer = new KafkaConsumer(consumerProps); consumer.subscribe(Collections.singleton(MultiPartitionMultiConsumerUsingStream.OUTPUT_TOPIC)); while (true) { Thread.sleep(5_000); consumer.poll(Long.MAX_VALUE).forEach(ConsumerRecord -> System.out.println("Partition :"+ConsumerRecord.partition()+"Key : " + Con
Doubts in KStreams
I have studied KafkaStreams, but not clearly understood 1.Can someone explain about Fault tolerence. 2.I have topicA and topicB with 4 partitions, so it created fourTasks, I have created it in singleJVM.But i need to knw how it works in multiple JVM and if one jvm goes down,how it another jvm takes the responsibility and how the localStateStore is recreated in the JVM which takes responsibility.
WordCount Example using GlobalKStore
i have created 3 inputtopics with 10 partitions each and output Topic with 10 partitions I did wordcount example and stored it in GlobalKTable. i initally stored counted value in LocalStateStore and then it to GlobalStateStore. i have atteated the code here: https://gist.github.com/Pk007790/d46236b1b5c394301f27b96891a94584 and i have supplied the inputs to the producers like this :https://gist.github.com/Pk007790/ba934b7bcea42b8b05f4816de3cb84a0 my ques is:how to store the processed information in GlobalStateStore without localStateStore
GlobalKStore
i have created 3 topics with 10 partitions each i have intended to store processed information in globalKtable now i have did with individual Ktable to Output topic then to GlobalKtable #// KStreamBuilder builder=new KStreamBuilder(); KStream inputStream = builder.stream(INPUT_TOPICA,INPUT_TOPICB,INPUT_TOPICC) .map(((key, value) -> new KeyValue<>(value, value))) .groupByKey() .count(INPUT_TABLE) .toStream(); inputStream.to(Serdes.String(),Serdes.Long(),OUTPUT_TOPIC); GlobalKTable objectObjectGlobalKTable = builder.globalTable(OUTPUT_TOPIC); KafkaStreams kafkaStreams=new KafkaStreams(builder,props); //# how to do it without localStateStore??
Re: regarding number of Stream Tasks
ohhh...thank you. Its cleared now On Tue, Oct 31, 2017 at 4:36 PM, Damian Guy wrote: > Hi, the `map` when it is followed by `groupByKey` will cause a > repartitioning of the data, so you will have your 5 tasks processing the > input partitions and 5 tasks processing the partitions from the > repartitioning. > > On Tue, 31 Oct 2017 at 10:56 pravin kumar wrote: > > > I have created a stream with topic contains 5 partitions and expected to > > create 5 stream tasks ,i got 10 tasks as > > 0_0 0_1 0_2 0_3 0_4 1_0 1_1 1_2 1_3 1_4 > > > > > > im doing wordcount in this example, > > > > here is my topology in this link: 1. > > https://gist.github.com/Pk007790/72b0718f26e6963246e83da992b3e725 > > 2.https://gist.github.com/Pk007790/a05226007ca90cdd36c362d09d19bda6. > > > > On Tue, Oct 24, 2017 at 3:29 PM, Damian Guy > wrote: > > > > > It would depend on what your topology looks like, which you haven't > show > > > here. But if there may be internal topics generated due to > repartitioning > > > which would cause the extra tasks. > > > If you provide the topology we would be able to tell you. > > > Thanks, > > > Damian > > > > > > On Tue, 24 Oct 2017 at 10:14 pravin kumar wrote: > > > > > > > I have created a stream with topic contains 5 partitions and expected > > to > > > > create 5 stream tasks ,i got 10 tasks as > > > > 0_0 0_1 0_2 0_3 0_4 1_0 1_1 1_2 1_3 1_4 > > > > > > > > > > > > my doubt is:im expected to have 5 tasks how it produced 10 tasks > > > > > > > > here are some logs: > > > > [2017-10-24 10:27:35,284] INFO Kafka commitId : > > > > cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser) > > > > [2017-10-24 10:27:35,284] DEBUG Kafka consumer created > > > > (org.apache.kafka.clients.consumer.KafkaConsumer) > > > > [2017-10-24 10:27:35,304] INFO stream-thread > > > > > > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898- > > > 9d8f-a1a9a8adfb7d-StreamThread-1] > > > > State transition from CREATED to RUNNING. > > > > (org.apache.kafka.streams.processor.internals.StreamThread) > > > > [2017-10-24 10:27:35,306] DEBUG stream-client > > > > > > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898- > > > 9d8f-a1a9a8adfb7d] > > > > Removing local Kafka Streams application data in > > > > > > > > /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/ > > > SingleConsumerMultiConsumerUsingStreamx4 > > > > for application SingleConsumerMultiConsumerUsingStreamx4. > > > > (org.apache.kafka.streams.KafkaStreams) > > > > [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired > state > > > dir > > > > lock for task 0_0 > > > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > > > [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting > > obsolete > > > > state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed > > > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released > state > > > dir > > > > lock for task 0_0 > > > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired > state > > > dir > > > > lock for task 1_0 > > > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > > > [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting > > obsolete > > > > state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed > > > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released > state > > > dir > > > > lock for task 1_0 > > > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired > state > > > dir > > > > lock for task 0_1 > > > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > > > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting > > obsolete > > > > state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed > > > > (org.apa
Re: regarding number of Stream Tasks
I have created a stream with topic contains 5 partitions and expected to create 5 stream tasks ,i got 10 tasks as 0_0 0_1 0_2 0_3 0_4 1_0 1_1 1_2 1_3 1_4 im doing wordcount in this example, here is my topology in this link: 1. https://gist.github.com/Pk007790/72b0718f26e6963246e83da992b3e725 2.https://gist.github.com/Pk007790/a05226007ca90cdd36c362d09d19bda6. On Tue, Oct 24, 2017 at 3:29 PM, Damian Guy wrote: > It would depend on what your topology looks like, which you haven't show > here. But if there may be internal topics generated due to repartitioning > which would cause the extra tasks. > If you provide the topology we would be able to tell you. > Thanks, > Damian > > On Tue, 24 Oct 2017 at 10:14 pravin kumar wrote: > > > I have created a stream with topic contains 5 partitions and expected to > > create 5 stream tasks ,i got 10 tasks as > > 0_0 0_1 0_2 0_3 0_4 1_0 1_1 1_2 1_3 1_4 > > > > > > my doubt is:im expected to have 5 tasks how it produced 10 tasks > > > > here are some logs: > > [2017-10-24 10:27:35,284] INFO Kafka commitId : > > cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser) > > [2017-10-24 10:27:35,284] DEBUG Kafka consumer created > > (org.apache.kafka.clients.consumer.KafkaConsumer) > > [2017-10-24 10:27:35,304] INFO stream-thread > > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898- > 9d8f-a1a9a8adfb7d-StreamThread-1] > > State transition from CREATED to RUNNING. > > (org.apache.kafka.streams.processor.internals.StreamThread) > > [2017-10-24 10:27:35,306] DEBUG stream-client > > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898- > 9d8f-a1a9a8adfb7d] > > Removing local Kafka Streams application data in > > > > /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/ > SingleConsumerMultiConsumerUsingStreamx4 > > for application SingleConsumerMultiConsumerUsingStreamx4. > > (org.apache.kafka.streams.KafkaStreams) > > [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired state > dir > > lock for task 0_0 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting obsolete > > state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released state > dir > > lock for task 0_0 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired state > dir > > lock for task 1_0 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting obsolete > > state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state > dir > > lock for task 1_0 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state > dir > > lock for task 0_1 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete > > state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state > dir > > lock for task 0_1 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state > dir > > lock for task 1_1 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete > > state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released state > dir > > lock for task 1_1 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired state > dir > > lock for task 0_2 > > (org.apache.kafka.streams.processor.internals.StateDirectory) > > [2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting obsolete > > state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed > > (org.apache.kafka.streams.processor.int
regarding number of Stream Tasks
I have created a stream with topic contains 5 partitions and expected to create 5 stream tasks ,i got 10 tasks as 0_0 0_1 0_2 0_3 0_4 1_0 1_1 1_2 1_3 1_4 my doubt is:im expected to have 5 tasks how it produced 10 tasks here are some logs: [2017-10-24 10:27:35,284] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser) [2017-10-24 10:27:35,284] DEBUG Kafka consumer created (org.apache.kafka.clients.consumer.KafkaConsumer) [2017-10-24 10:27:35,304] INFO stream-thread [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d-StreamThread-1] State transition from CREATED to RUNNING. (org.apache.kafka.streams.processor.internals.StreamThread) [2017-10-24 10:27:35,306] DEBUG stream-client [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d] Removing local Kafka Streams application data in /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/SingleConsumerMultiConsumerUsingStreamx4 for application SingleConsumerMultiConsumerUsingStreamx4. (org.apache.kafka.streams.KafkaStreams) [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired state dir lock for task 0_0 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting obsolete state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released state dir lock for task 0_0 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired state dir lock for task 1_0 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting obsolete state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state dir lock for task 1_0 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state dir lock for task 0_1 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state dir lock for task 0_1 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state dir lock for task 1_1 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released state dir lock for task 1_1 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired state dir lock for task 0_2 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting obsolete state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Released state dir lock for task 0_2 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Acquired state dir lock for task 1_2 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,397] INFO stream-thread [cleanup] Deleting obsolete state directory 1_2 for task 1_2 as cleanup delay of 0 ms has passed (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state dir lock for task 1_2 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state dir lock for task 0_3 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,398] INFO stream-thread [cleanup] Deleting obsolete state directory 0_3 for task 0_3 as cleanup delay of 0 ms has passed (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state dir lock for task 0_3 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state dir lock for task 1_3 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-10-24 10:27:35,399] INFO stream-thread [cleanup] Deleting obsolete state directory 1_3 for task 1_3 as cleanup delay of 0 ms has passed (org.apache.k
StreamTasks
I have created a stream with topic contains 5 partitions and expected to create 5 stream tasks ,i got [admin@nms-181 ]$ ls 0_0 0_1 0_2 0_3 0_4 1_0 1_1 1_2 1_3 1_4 SingleConsumerMultiConsumerUsingStreamx4 is my application_id and this 1_0,1_... contains localStateStore and 0_0,0_ contains nothing,its empty. My doubt is: 1)what is the difference between 0_... and 1_... 2)why 0_... is created as empty
Re: reg Kafka Node num
i have one doubt in ur answer: why should we differentiate the kafka logs trafiic from controller node with the normal trafiic from the same node?? On Sat, Oct 21, 2017 at 1:52 AM, Eric Azama wrote: > Kafka logs traffic to the Controller node separately from the normal > traffic to the node. In order to differentiate it subtracts the broker id > from 2147483647 (max int) and uses the result as the "node id" for the > controller. > > On a related note, logs and metrics related to the bootstrap process seems > to be logged with negative integers. -1 for the first bootstrap server, -2 > for the second and so on. > > > > ------ Forwarded message -- > From: pravin kumar > To: users@kafka.apache.org > Cc: > Bcc: > Date: Fri, 20 Oct 2017 14:44:04 +0530 > Subject: reg Kafka Node num > I have run a Multiple Partition and Multiple Consumer application.Then i > have altered the no of Partitions, > > in log im getting > > Using older server API v0 to send HEARTBEAT > {group_id= > altermultipartitionmulticonsumer2,group_generation_id=6, > member_id=C2-b7b4f485-d0c3-466b-b0e7-b3de834832e9} to node 2147483647. > (org.apache.kafka.clients.NetworkClient) > > i have one node and 3 consumers and 15 partitions. > > my doubt is :why im getting this num node 2147483647 since i have only > one node >
reg Kafka Node num
I have run a Multiple Partition and Multiple Consumer application.Then i have altered the no of Partitions, in log im getting Using older server API v0 to send HEARTBEAT {group_id= altermultipartitionmulticonsumer2,group_generation_id=6, member_id=C2-b7b4f485-d0c3-466b-b0e7-b3de834832e9} to node 2147483647. (org.apache.kafka.clients.NetworkClient) i have one node and 3 consumers and 15 partitions. my doubt is :why im getting this num node 2147483647 since i have only one node
reg kafka API versions
I have run a Multiple Partition and Multiple Consumer application.Then i have altered the no of Partitions, in log im getting Using older server API v0 to send HEARTBEAT {group_id= altermultipartitionmulticonsumer2,group_generation_id=6, member_id=C2-b7b4f485-d0c3-466b-b0e7-b3de834832e9} to node 2147483647. (org.apache.kafka.clients.NetworkClient) what does olderserver API v0 means .will anyone explain this