I tried to create a kafka streams application, but there was a bug in my
code. I believe there was a deadlock for my application and whenever I
tried to run an application instance with the same
StreamsConfig.APPLICATION_ID_CONFIG id it could not start. I had to create
another instance with different StreamsConfig.APPLICATION_ID_CONFIG id in
order for it to start. I fixed the bug by adding:

> kafkaStreams.setUncaughtExceptionHandler((Thread thread, Throwable
> throwable) -> {
>         kafkaStreams.close(1000L, TimeUnit.MILLISECONDS);
> });
>
and I had never the same problem of deadlock.

*Nevertheless, these days my  code had another bug, and the execution got
stuck in a while loop. *
I used several times:

> pkill -u myuser
>

with no problems.
However, now when I try to run 10 instances of my streams application only
a portion of them are initialized.
I mean that the method

> public void init(ProcessorContext processorContext)
>
in my 10 processors is executed only to a portion of my 10 kafka streams
application instances.
To be more specific, I run a scriptfile

> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors1
> VectorSink1 VectorSink2 VectorSourceNodeName1 VectorProcessorName1
> StateStoreName1 si1-client si1-group si1-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors2
> VectorSink3 VectorSink4 VectorSourceNodeName2 VectorProcessorName2
> StateStoreName2 si2-client si2-group si2-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors3
> VectorSink5 VectorSink6 VectorSourceNodeName3 VectorProcessorName3
> StateStoreName3 si3-client si3-group si3-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors4
> VectorSink7 VectorSink8 VectorSourceNodeName4 VectorProcessorName4
> StateStoreName4 si4-client si4-group si4-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors5
> VectorSink9 VectorSink10 VectorSourceNodeName5 VectorProcessorName5
> StateStoreName5 si5-client si5-group si5-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors6
> VectorSink11 VectorSink12 VectorSourceNodeName6 VectorProcessorName6
> StateStoreName6 si6-client si6-group si6-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors7
> VectorSink13 VectorSink14 VectorSourceNodeName7 VectorProcessorName7
> StateStoreName7 si7-client si7-group si7-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors8
> VectorSink15 VectorSink16 VectorSourceNodeName8 VectorProcessorName8
> StateStoreName8 si8-client si8-group si8-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors9
> VectorSink17 VectorSink18 VectorSourceNodeName9 VectorProcessorName9
> StateStoreName9 si9-client si9-group si9-appid"  &
> mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors10
> VectorSink19 VectorSink20 VectorSourceNodeName10 VectorProcessorName10
> StateStoreName10 si10-client si10-group si10-appid"  &
>

and usually only 7 processors are initialized out of ten (Each application
has the same topology consisting of one Processor).

*Whenever I don't change the StreamsConfig.APPLICATION_ID_CONFIG id the
same instances are not initialized.*
*Whenever I change the StreamsConfig.APPLICATION_ID_CONFIG id of them all,
the number of instances that get initialized change (they are now 8 or 9
now initalized according to the change of the
StreamsConfig.APPLICATION_ID_CONFIG and they stick to that until I change
the StreamsConfig.APPLICATION_ID_CONFIG again to all of them).*


My streams topology is*:*


public class SiteApplication {
> static Logger logger = Logger.getLogger(SiteApplication.class.getName());
>
>
>
>
> public static void main(String[] args) throws Exception {
>
> PropertyConfigurator.configure(SiteApplication.class.getClassLoader().getResourceAsStream("log4j.properties"));
>
>         Serde<String> stringSerde = Serdes.String();
>         Serde<Integer> integerSerde = Serdes.Integer();
>         Serde<byte[]> byteArraySerde = Serdes.ByteArray();
>
>         Serializer<String> stringSerializer = stringSerde.serializer();
>         Deserializer<String> stringDeserializer =
> stringSerde.deserializer();
>
>         Serializer<Integer> integerSerializer = integerSerde.serializer();
>         Deserializer<Integer> integerDeserializer =
> integerSerde.deserializer();
>
>         Serializer<byte[]> byteArraySerializer =
> byteArraySerde.serializer();
>         Deserializer<byte[]> byteArrayDeserializer =
> byteArraySerde.deserializer();
>
>         Serializer<byte[]> byteArraySerializer1 =
> byteArraySerde.serializer();
>         Deserializer<byte[]> byteArrayDeserializer1 =
> byteArraySerde.deserializer();
>
>
>         Topology toplogy = new Topology();
>
>         String vectorSink1 = args[1];
>         String vectorSink2 = args[2];
>         String vectorSourceNodeName = args[3];
>         String vectorProcessorName = args[4];
>         String firstStateStoreName = args[5];
>
>
>         MyCustomStoreBuilder vectorStoreBuilder = new
> MyCustomStoreBuilder(firstStateStoreName);
>
>
>         toplogy.addSource(vectorSourceNodeName, stringDeserializer,
> integerDeserializer, args[0])
>         .addProcessor(vectorProcessorName,
>                       ()->new SiteProcessor(firstStateStoreName,
> vectorSink1, vectorSink2),
>                       vectorSourceNodeName)
>         .addSink(vectorSink1,"IncreaseOfC", stringSerializer,
> byteArraySerializer, vectorProcessorName)
>         .addSink(vectorSink2, "Messages", stringSerializer,
> byteArraySerializer1, vectorProcessorName)
>         .addStateStore(vectorStoreBuilder, vectorProcessorName);
>
>
>         Properties props = new Properties();
>         props.put(StreamsConfig.CLIENT_ID_CONFIG, args[6]);
>         props.put(ConsumerConfig.GROUP_ID_CONFIG, args[7]);
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG, args[8]);
>         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "
> clu1.myuniversity.com:6667,clu2.myuniversity:6667,
> clu3.myuniversity.com:6667,clu4.myuniversity.com:6667");
>         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
>         props.put(StreamsConfig.STATE_DIR_CONFIG, "/home/myuser/state");
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
> "earliest");
>         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass() );
>         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass() );
>
>
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
> 60000);
> props.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG),
> 60000);
>
>         KafkaStreams kafkaStreams = new KafkaStreams(toplogy, props);
>         kafkaStreams.setUncaughtExceptionHandler((Thread thread, Throwable
> throwable) -> {
>         // here you should examine the throwable/exception and perform an
> appropriate action!
> logger.info("There is an error in site " + args[6]);
>         kafkaStreams.close(1000L, TimeUnit.MILLISECONDS);
> });
>
>
>         logger.info("Starting FGM Application now for " + args[6]);
>         kafkaStreams.cleanUp();
>         kafkaStreams.start();
>
>         // Add shutdown hook to respond to SIGTERM and gracefully close
> Kafka Streams
>         Runtime.getRuntime().addShutdownHook(new Thread(()->{
>                         try {
>                                 System.out.println("Shutting down FGM
> Application  now");
>                               kafkaStreams.close(1000L,
> TimeUnit.MILLISECONDS);
>                         } catch (final Exception e) {
>                               e.printStackTrace();
>                         }
>                 }));
> }
>
>
>
> }





I have to say something to the administrator of the cluster of my
university running the kafka Streams  but I do not know what exactly is
happening.

*I do not know if It is a bug in my code because it is not stuck in a while
loop any more, because I run a steady version of my code which ran in the
past without problems.*

*The problems began after being stuck in a while loop in each application
instance but I fixed this.*

*Does the installation of the Apache Kafka need a restart by the
administrator or another change to configuration?*

Is it a deadlock that I am missing?

If it were a deadlock why it would not happen to all the application
instances but instead only to a portion of them?

Thanks in advance.

Reply via email to