I tried to create a kafka streams application, but there was a bug in my
code. I believe there was a deadlock for my application and whenever I
tried to run an application instance with the same
StreamsConfig.APPLICATION_ID_CONFIG id it could not start. I had to create
another instance with different StreamsConfig.APPLICATION_ID_CONFIG id in
order for it to start. I fixed the bug by adding:
> kafkaStreams.setUncaughtExceptionHandler((Thread thread, Throwable
> throwable) -> {
> kafkaStreams.close(1000L, TimeUnit.MILLISECONDS);
> });
>
and I had never the same problem of deadlock.
*Nevertheless, these days my code had another bug, and the execution got
stuck in a while loop. *
I used several times:
> pkill -u myuser
>
with no problems.
However, now when I try to run 10 instances of my streams application only
a portion of them are initialized.
I mean that the method
> public void init(ProcessorContext processorContext)
>
in my 10 processors is executed only to a portion of my 10 kafka streams
application instances.
To be more specific, I run a scriptfile
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors1
> VectorSink1 VectorSink2 VectorSourceNodeName1 VectorProcessorName1
> StateStoreName1 si1-client si1-group si1-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors2
> VectorSink3 VectorSink4 VectorSourceNodeName2 VectorProcessorName2
> StateStoreName2 si2-client si2-group si2-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors3
> VectorSink5 VectorSink6 VectorSourceNodeName3 VectorProcessorName3
> StateStoreName3 si3-client si3-group si3-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors4
> VectorSink7 VectorSink8 VectorSourceNodeName4 VectorProcessorName4
> StateStoreName4 si4-client si4-group si4-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors5
> VectorSink9 VectorSink10 VectorSourceNodeName5 VectorProcessorName5
> StateStoreName5 si5-client si5-group si5-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors6
> VectorSink11 VectorSink12 VectorSourceNodeName6 VectorProcessorName6
> StateStoreName6 si6-client si6-group si6-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors7
> VectorSink13 VectorSink14 VectorSourceNodeName7 VectorProcessorName7
> StateStoreName7 si7-client si7-group si7-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors8
> VectorSink15 VectorSink16 VectorSourceNodeName8 VectorProcessorName8
> StateStoreName8 si8-client si8-group si8-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors9
> VectorSink17 VectorSink18 VectorSourceNodeName9 VectorProcessorName9
> StateStoreName9 si9-client si9-group si9-appid" &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors10
> VectorSink19 VectorSink20 VectorSourceNodeName10 VectorProcessorName10
> StateStoreName10 si10-client si10-group si10-appid" &
>
and usually only 7 processors are initialized out of ten (Each application
has the same topology consisting of one Processor).
*Whenever I don't change the StreamsConfig.APPLICATION_ID_CONFIG id the
same instances are not initialized.*
*Whenever I change the StreamsConfig.APPLICATION_ID_CONFIG id of them all,
the number of instances that get initialized change (they are now 8 or 9
now initalized according to the change of the
StreamsConfig.APPLICATION_ID_CONFIG and they stick to that until I change
the StreamsConfig.APPLICATION_ID_CONFIG again to all of them).*
My streams topology is*:*
public class SiteApplication {
> static Logger logger = Logger.getLogger(SiteApplication.class.getName());
>
>
>
>
> public static void main(String[] args) throws Exception {
>
> PropertyConfigurator.configure(SiteApplication.class.getClassLoader().getResourceAsStream("log4j.properties"));
>
> Serde<String> stringSerde = Serdes.String();
> Serde<Integer> integerSerde = Serdes.Integer();
> Serde<byte[]> byteArraySerde = Serdes.ByteArray();
>
> Serializer<String> stringSerializer = stringSerde.serializer();
> Deserializer<String> stringDeserializer =
> stringSerde.deserializer();
>
> Serializer<Integer> integerSerializer = integerSerde.serializer();
> Deserializer<Integer> integerDeserializer =
> integerSerde.deserializer();
>
> Serializer<byte[]> byteArraySerializer =
> byteArraySerde.serializer();
> Deserializer<byte[]> byteArrayDeserializer =
> byteArraySerde.deserializer();
>
> Serializer<byte[]> byteArraySerializer1 =
> byteArraySerde.serializer();
> Deserializer<byte[]> byteArrayDeserializer1 =
> byteArraySerde.deserializer();
>
>
> Topology toplogy = new Topology();
>
> String vectorSink1 = args[1];
> String vectorSink2 = args[2];
> String vectorSourceNodeName = args[3];
> String vectorProcessorName = args[4];
> String firstStateStoreName = args[5];
>
>
> MyCustomStoreBuilder vectorStoreBuilder = new
> MyCustomStoreBuilder(firstStateStoreName);
>
>
> toplogy.addSource(vectorSourceNodeName, stringDeserializer,
> integerDeserializer, args[0])
> .addProcessor(vectorProcessorName,
> ()->new SiteProcessor(firstStateStoreName,
> vectorSink1, vectorSink2),
> vectorSourceNodeName)
> .addSink(vectorSink1,"IncreaseOfC", stringSerializer,
> byteArraySerializer, vectorProcessorName)
> .addSink(vectorSink2, "Messages", stringSerializer,
> byteArraySerializer1, vectorProcessorName)
> .addStateStore(vectorStoreBuilder, vectorProcessorName);
>
>
> Properties props = new Properties();
> props.put(StreamsConfig.CLIENT_ID_CONFIG, args[6]);
> props.put(ConsumerConfig.GROUP_ID_CONFIG, args[7]);
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, args[8]);
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "
> clu1.myuniversity.com:6667,clu2.myuniversity:6667,
> clu3.myuniversity.com:6667,clu4.myuniversity.com:6667");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
> props.put(StreamsConfig.STATE_DIR_CONFIG, "/home/myuser/state");
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
> "earliest");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass() );
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass() );
>
>
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
> 60000);
> props.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG),
> 60000);
>
> KafkaStreams kafkaStreams = new KafkaStreams(toplogy, props);
> kafkaStreams.setUncaughtExceptionHandler((Thread thread, Throwable
> throwable) -> {
> // here you should examine the throwable/exception and perform an
> appropriate action!
> logger.info("There is an error in site " + args[6]);
> kafkaStreams.close(1000L, TimeUnit.MILLISECONDS);
> });
>
>
> logger.info("Starting FGM Application now for " + args[6]);
> kafkaStreams.cleanUp();
> kafkaStreams.start();
>
> // Add shutdown hook to respond to SIGTERM and gracefully close
> Kafka Streams
> Runtime.getRuntime().addShutdownHook(new Thread(()->{
> try {
> System.out.println("Shutting down FGM
> Application now");
> kafkaStreams.close(1000L,
> TimeUnit.MILLISECONDS);
> } catch (final Exception e) {
> e.printStackTrace();
> }
> }));
> }
>
>
>
> }
I have to say something to the administrator of the cluster of my
university running the kafka Streams but I do not know what exactly is
happening.
*I do not know if It is a bug in my code because it is not stuck in a while
loop any more, because I run a steady version of my code which ran in the
past without problems.*
*The problems began after being stuck in a while loop in each application
instance but I fixed this.*
*Does the installation of the Apache Kafka need a restart by the
administrator or another change to configuration?*
Is it a deadlock that I am missing?
If it were a deadlock why it would not happen to all the application
instances but instead only to a portion of them?
Thanks in advance.