I tried to create a kafka streams application, but there was a bug in my code. I believe there was a deadlock for my application and whenever I tried to run an application instance with the same StreamsConfig.APPLICATION_ID_CONFIG id it could not start. I had to create another instance with different StreamsConfig.APPLICATION_ID_CONFIG id in order for it to start. I fixed the bug by adding:
> kafkaStreams.setUncaughtExceptionHandler((Thread thread, Throwable > throwable) -> { > kafkaStreams.close(1000L, TimeUnit.MILLISECONDS); > }); > and I had never the same problem of deadlock. *Nevertheless, these days my code had another bug, and the execution got stuck in a while loop. * I used several times: > pkill -u myuser > with no problems. However, now when I try to run 10 instances of my streams application only a portion of them are initialized. I mean that the method > public void init(ProcessorContext processorContext) > in my 10 processors is executed only to a portion of my 10 kafka streams application instances. To be more specific, I run a scriptfile > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors1 > VectorSink1 VectorSink2 VectorSourceNodeName1 VectorProcessorName1 > StateStoreName1 si1-client si1-group si1-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors2 > VectorSink3 VectorSink4 VectorSourceNodeName2 VectorProcessorName2 > StateStoreName2 si2-client si2-group si2-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors3 > VectorSink5 VectorSink6 VectorSourceNodeName3 VectorProcessorName3 > StateStoreName3 si3-client si3-group si3-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors4 > VectorSink7 VectorSink8 VectorSourceNodeName4 VectorProcessorName4 > StateStoreName4 si4-client si4-group si4-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors5 > VectorSink9 VectorSink10 VectorSourceNodeName5 VectorProcessorName5 > StateStoreName5 si5-client si5-group si5-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors6 > VectorSink11 VectorSink12 VectorSourceNodeName6 VectorProcessorName6 > StateStoreName6 si6-client si6-group si6-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors7 > VectorSink13 VectorSink14 VectorSourceNodeName7 VectorProcessorName7 > StateStoreName7 si7-client si7-group si7-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors8 > VectorSink15 VectorSink16 VectorSourceNodeName8 VectorProcessorName8 > StateStoreName8 si8-client si8-group si8-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors9 > VectorSink17 VectorSink18 VectorSourceNodeName9 VectorProcessorName9 > StateStoreName9 si9-client si9-group si9-appid" & > mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors10 > VectorSink19 VectorSink20 VectorSourceNodeName10 VectorProcessorName10 > StateStoreName10 si10-client si10-group si10-appid" & > and usually only 7 processors are initialized out of ten (Each application has the same topology consisting of one Processor). *Whenever I don't change the StreamsConfig.APPLICATION_ID_CONFIG id the same instances are not initialized.* *Whenever I change the StreamsConfig.APPLICATION_ID_CONFIG id of them all, the number of instances that get initialized change (they are now 8 or 9 now initalized according to the change of the StreamsConfig.APPLICATION_ID_CONFIG and they stick to that until I change the StreamsConfig.APPLICATION_ID_CONFIG again to all of them).* My streams topology is*:* public class SiteApplication { > static Logger logger = Logger.getLogger(SiteApplication.class.getName()); > > > > > public static void main(String[] args) throws Exception { > > PropertyConfigurator.configure(SiteApplication.class.getClassLoader().getResourceAsStream("log4j.properties")); > > Serde<String> stringSerde = Serdes.String(); > Serde<Integer> integerSerde = Serdes.Integer(); > Serde<byte[]> byteArraySerde = Serdes.ByteArray(); > > Serializer<String> stringSerializer = stringSerde.serializer(); > Deserializer<String> stringDeserializer = > stringSerde.deserializer(); > > Serializer<Integer> integerSerializer = integerSerde.serializer(); > Deserializer<Integer> integerDeserializer = > integerSerde.deserializer(); > > Serializer<byte[]> byteArraySerializer = > byteArraySerde.serializer(); > Deserializer<byte[]> byteArrayDeserializer = > byteArraySerde.deserializer(); > > Serializer<byte[]> byteArraySerializer1 = > byteArraySerde.serializer(); > Deserializer<byte[]> byteArrayDeserializer1 = > byteArraySerde.deserializer(); > > > Topology toplogy = new Topology(); > > String vectorSink1 = args[1]; > String vectorSink2 = args[2]; > String vectorSourceNodeName = args[3]; > String vectorProcessorName = args[4]; > String firstStateStoreName = args[5]; > > > MyCustomStoreBuilder vectorStoreBuilder = new > MyCustomStoreBuilder(firstStateStoreName); > > > toplogy.addSource(vectorSourceNodeName, stringDeserializer, > integerDeserializer, args[0]) > .addProcessor(vectorProcessorName, > ()->new SiteProcessor(firstStateStoreName, > vectorSink1, vectorSink2), > vectorSourceNodeName) > .addSink(vectorSink1,"IncreaseOfC", stringSerializer, > byteArraySerializer, vectorProcessorName) > .addSink(vectorSink2, "Messages", stringSerializer, > byteArraySerializer1, vectorProcessorName) > .addStateStore(vectorStoreBuilder, vectorProcessorName); > > > Properties props = new Properties(); > props.put(StreamsConfig.CLIENT_ID_CONFIG, args[6]); > props.put(ConsumerConfig.GROUP_ID_CONFIG, args[7]); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, args[8]); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, " > clu1.myuniversity.com:6667,clu2.myuniversity:6667, > clu3.myuniversity.com:6667,clu4.myuniversity.com:6667"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1); > props.put(StreamsConfig.STATE_DIR_CONFIG, "/home/myuser/state"); > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), > "earliest"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass() ); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass() ); > > > > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), > 60000); > props.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), > 60000); > > KafkaStreams kafkaStreams = new KafkaStreams(toplogy, props); > kafkaStreams.setUncaughtExceptionHandler((Thread thread, Throwable > throwable) -> { > // here you should examine the throwable/exception and perform an > appropriate action! > logger.info("There is an error in site " + args[6]); > kafkaStreams.close(1000L, TimeUnit.MILLISECONDS); > }); > > > logger.info("Starting FGM Application now for " + args[6]); > kafkaStreams.cleanUp(); > kafkaStreams.start(); > > // Add shutdown hook to respond to SIGTERM and gracefully close > Kafka Streams > Runtime.getRuntime().addShutdownHook(new Thread(()->{ > try { > System.out.println("Shutting down FGM > Application now"); > kafkaStreams.close(1000L, > TimeUnit.MILLISECONDS); > } catch (final Exception e) { > e.printStackTrace(); > } > })); > } > > > > } I have to say something to the administrator of the cluster of my university running the kafka Streams but I do not know what exactly is happening. *I do not know if It is a bug in my code because it is not stuck in a while loop any more, because I run a steady version of my code which ran in the past without problems.* *The problems began after being stuck in a while loop in each application instance but I fixed this.* *Does the installation of the Apache Kafka need a restart by the administrator or another change to configuration?* Is it a deadlock that I am missing? If it were a deadlock why it would not happen to all the application instances but instead only to a portion of them? Thanks in advance.