I found out by logger that for those Applications whose Processors *are
initialized* it is logged that:


INFO  AbstractCoordinator:677 - [Consumer
clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid]
Discovered group coordinator myserver:6667 (id: 2147482646 rack: null)
>
> INFO  ConsumerCoordinator:472 - [Consumer 
> clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Revoking 
> previously assigned partitions []
>  INFO  StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State 
> transition from RUNNING to PARTITIONS_REVOKED
>  INFO  KafkaStreams:261 - stream-client [sk5-client] State transition from 
> RUNNING to REBALANCING
>  INFO  StreamThread:320 - stream-thread [sk5-client-StreamThread-1] partition 
> revocation took 1 ms.
>     suspended active tasks: []
>     suspended standby tasks: []
>
> StreamsPartitionAssignor:579 - stream-thread 
> [sk5-client-StreamThread-1-consumer] Assigned tasks to clients as 
> {=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) 
> prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) 
> capacity: 1]}.
> INFO  AbstractCoordinator:473 - [Consumer 
> clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Successfully 
> joined group with generation 1
> INFO  ConsumerCoordinator:280 - [Consumer 
> clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Setting newly 
> assigned partitions [Vectors5-0]
> INFO  StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State 
> transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> INFO  StreamThread:280 - stream-thread [sk5-client-StreamThread-1] partition 
> assignment took 23 ms.
>     current active tasks: [0_0]
>     current standby tasks: []
>     previous active tasks: []
>  INFO  StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State 
> transition from PARTITIONS_ASSIGNED to RUNNING
> INFO  KafkaStreams:261 - stream-client [sk5-client] State transition from 
> REBALANCING to RUNNING
>
> For those Applications whose Processors *are not initialized* it is
logged that State transition from CREATED to RUNNING only and no
rebalancing takes place.

As the logger suggests those stream applications whose processors are not
initiallized cannot discover group coordinator. Is it a problem that I have
to report to the administrator of the cluster that runs apache kafka or is
it a bug in my code that I have to search for?

My solution was to change the name of CLIENT_ID_CONFIG and of
APPLICATION_ID_CONFIG of those stream applications that were not
initialized several times until they were initialized.

On Wed, Mar 4, 2020 at 6:22 AM Matthias J. Sax <mj...@apache.org> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> It's hard to say. However, calling `KafkaStreams#close()` in the
> uncaught exception handler is considered bad practice (using a timeout
> avoids the deadlock, but it would be better to just set a boolean flag
> and call `close()` from outside the handler.
>
> I assume your input topic has enough partitions and there are enough
> tasks to utilize all 10 instances?
>
> You also don't need to set the `group.id` -- this config will be
> ignored anyway and the `application.id` is used as consumer `group.id`.
>
> I would try to inspect the log (if required at DEBUG level) to get
> more intel what the application is doing.
>
>
> - -Matthias
>
>
> On 2/23/20 1:09 PM, Michelle Francois wrote:
> > I tried to create a kafka streams application, but there was a bug
> > in my code. I believe there was a deadlock for my application and
> > whenever I tried to run an application instance with the same
> > StreamsConfig.APPLICATION_ID_CONFIG id it could not start. I had to
> > create another instance with different
> > StreamsConfig.APPLICATION_ID_CONFIG id in order for it to start. I
> > fixed the bug by adding:
> >
> >> kafkaStreams.setUncaughtExceptionHandler((Thread thread,
> >> Throwable throwable) -> { kafkaStreams.close(1000L,
> >> TimeUnit.MILLISECONDS); });
> >>
> > and I had never the same problem of deadlock.
> >
> > *Nevertheless, these days my  code had another bug, and the
> > execution got stuck in a while loop. * I used several times:
> >
> >> pkill -u myuser
> >>
> >
> > with no problems. However, now when I try to run 10 instances of my
> > streams application only a portion of them are initialized. I mean
> > that the method
> >
> >> public void init(ProcessorContext processorContext)
> >>
> > in my 10 processors is executed only to a portion of my 10 kafka
> > streams application instances. To be more specific, I run a
> > scriptfile
> >
> >> mvn exec:java -Dexec.mainClass="SiteApplication"
> >> -Dexec.args="Vectors1 VectorSink1 VectorSink2
> >> VectorSourceNodeName1 VectorProcessorName1 StateStoreName1
> >> si1-client si1-group si1-appid"  & mvn exec:java
> >> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors2
> >> VectorSink3 VectorSink4 VectorSourceNodeName2
> >> VectorProcessorName2 StateStoreName2 si2-client si2-group
> >> si2-appid"  & mvn exec:java -Dexec.mainClass="SiteApplication"
> >> -Dexec.args="Vectors3 VectorSink5 VectorSink6
> >> VectorSourceNodeName3 VectorProcessorName3 StateStoreName3
> >> si3-client si3-group si3-appid"  & mvn exec:java
> >> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors4
> >> VectorSink7 VectorSink8 VectorSourceNodeName4
> >> VectorProcessorName4 StateStoreName4 si4-client si4-group
> >> si4-appid"  & mvn exec:java -Dexec.mainClass="SiteApplication"
> >> -Dexec.args="Vectors5 VectorSink9 VectorSink10
> >> VectorSourceNodeName5 VectorProcessorName5 StateStoreName5
> >> si5-client si5-group si5-appid"  & mvn exec:java
> >> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors6
> >> VectorSink11 VectorSink12 VectorSourceNodeName6
> >> VectorProcessorName6 StateStoreName6 si6-client si6-group
> >> si6-appid"  & mvn exec:java -Dexec.mainClass="SiteApplication"
> >> -Dexec.args="Vectors7 VectorSink13 VectorSink14
> >> VectorSourceNodeName7 VectorProcessorName7 StateStoreName7
> >> si7-client si7-group si7-appid"  & mvn exec:java
> >> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors8
> >> VectorSink15 VectorSink16 VectorSourceNodeName8
> >> VectorProcessorName8 StateStoreName8 si8-client si8-group
> >> si8-appid"  & mvn exec:java -Dexec.mainClass="SiteApplication"
> >> -Dexec.args="Vectors9 VectorSink17 VectorSink18
> >> VectorSourceNodeName9 VectorProcessorName9 StateStoreName9
> >> si9-client si9-group si9-appid"  & mvn exec:java
> >> -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors10
> >> VectorSink19 VectorSink20 VectorSourceNodeName10
> >> VectorProcessorName10 StateStoreName10 si10-client si10-group
> >> si10-appid"  &
> >>
> >
> > and usually only 7 processors are initialized out of ten (Each
> > application has the same topology consisting of one Processor).
> >
> > *Whenever I don't change the StreamsConfig.APPLICATION_ID_CONFIG id
> > the same instances are not initialized.* *Whenever I change the
> > StreamsConfig.APPLICATION_ID_CONFIG id of them all, the number of
> > instances that get initialized change (they are now 8 or 9 now
> > initalized according to the change of the
> > StreamsConfig.APPLICATION_ID_CONFIG and they stick to that until I
> > change the StreamsConfig.APPLICATION_ID_CONFIG again to all of
> > them).*
> >
> >
> > My streams topology is*:*
> >
> >
> > public class SiteApplication {
> >> static Logger logger =
> >> Logger.getLogger(SiteApplication.class.getName());
> >>
> >>
> >>
> >>
> >> public static void main(String[] args) throws Exception {
> >>
> >> PropertyConfigurator.configure(SiteApplication.class.getClassLoader()
> .getResourceAsStream("log4j.properties"));
> >>
> >>
> >>
> Serde<String> stringSerde = Serdes.String();
> >> Serde<Integer> integerSerde = Serdes.Integer(); Serde<byte[]>
> >> byteArraySerde = Serdes.ByteArray();
> >>
> >> Serializer<String> stringSerializer = stringSerde.serializer();
> >> Deserializer<String> stringDeserializer =
> >> stringSerde.deserializer();
> >>
> >> Serializer<Integer> integerSerializer =
> >> integerSerde.serializer(); Deserializer<Integer>
> >> integerDeserializer = integerSerde.deserializer();
> >>
> >> Serializer<byte[]> byteArraySerializer =
> >> byteArraySerde.serializer(); Deserializer<byte[]>
> >> byteArrayDeserializer = byteArraySerde.deserializer();
> >>
> >> Serializer<byte[]> byteArraySerializer1 =
> >> byteArraySerde.serializer(); Deserializer<byte[]>
> >> byteArrayDeserializer1 = byteArraySerde.deserializer();
> >>
> >>
> >> Topology toplogy = new Topology();
> >>
> >> String vectorSink1 = args[1]; String vectorSink2 = args[2];
> >> String vectorSourceNodeName = args[3]; String vectorProcessorName
> >> = args[4]; String firstStateStoreName = args[5];
> >>
> >>
> >> MyCustomStoreBuilder vectorStoreBuilder = new
> >> MyCustomStoreBuilder(firstStateStoreName);
> >>
> >>
> >> toplogy.addSource(vectorSourceNodeName, stringDeserializer,
> >> integerDeserializer, args[0]) .addProcessor(vectorProcessorName,
> >> ()->new SiteProcessor(firstStateStoreName, vectorSink1,
> >> vectorSink2), vectorSourceNodeName)
> >> .addSink(vectorSink1,"IncreaseOfC", stringSerializer,
> >> byteArraySerializer, vectorProcessorName) .addSink(vectorSink2,
> >> "Messages", stringSerializer, byteArraySerializer1,
> >> vectorProcessorName) .addStateStore(vectorStoreBuilder,
> >> vectorProcessorName);
> >>
> >>
> >> Properties props = new Properties();
> >> props.put(StreamsConfig.CLIENT_ID_CONFIG, args[6]);
> >> props.put(ConsumerConfig.GROUP_ID_CONFIG, args[7]);
> >> props.put(StreamsConfig.APPLICATION_ID_CONFIG, args[8]);
> >> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "
> >> clu1.myuniversity.com:6667,clu2.myuniversity:6667,
> >> clu3.myuniversity.com:6667,clu4.myuniversity.com:6667");
> >> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
> >> props.put(StreamsConfig.STATE_DIR_CONFIG, "/home/myuser/state");
> >>
> >> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RES
> ET_CONFIG),
> >>
> >>
> "earliest");
> >> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> >> Serdes.String().getClass() );
> >> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> >> Serdes.String().getClass() );
> >>
> >>
> >>
> >> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AG
> E_CONFIG),
> >>
> >>
> 60000);
> >> props.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AG
> E_CONFIG),
> >>
> >>
> 60000);
> >>
> >> KafkaStreams kafkaStreams = new KafkaStreams(toplogy, props);
> >> kafkaStreams.setUncaughtExceptionHandler((Thread thread,
> >> Throwable throwable) -> { // here you should examine the
> >> throwable/exception and perform an appropriate action!
> >> logger.info("There is an error in site " + args[6]);
> >> kafkaStreams.close(1000L, TimeUnit.MILLISECONDS); });
> >>
> >>
> >> logger.info("Starting FGM Application now for " + args[6]);
> >> kafkaStreams.cleanUp(); kafkaStreams.start();
> >>
> >> // Add shutdown hook to respond to SIGTERM and gracefully close
> >> Kafka Streams Runtime.getRuntime().addShutdownHook(new
> >> Thread(()->{ try { System.out.println("Shutting down FGM
> >> Application  now"); kafkaStreams.close(1000L,
> >> TimeUnit.MILLISECONDS); } catch (final Exception e) {
> >> e.printStackTrace(); } })); }
> >>
> >>
> >>
> >> }
> >
> >
> >
> >
> >
> > I have to say something to the administrator of the cluster of my
> > university running the kafka Streams  but I do not know what
> > exactly is happening.
> >
> > *I do not know if It is a bug in my code because it is not stuck in
> > a while loop any more, because I run a steady version of my code
> > which ran in the past without problems.*
> >
> > *The problems began after being stuck in a while loop in each
> > application instance but I fixed this.*
> >
> > *Does the installation of the Apache Kafka need a restart by the
> > administrator or another change to configuration?*
> >
> > Is it a deadlock that I am missing?
> >
> > If it were a deadlock why it would not happen to all the
> > application instances but instead only to a portion of them?
> >
> > Thanks in advance.
> >
> -----BEGIN PGP SIGNATURE-----
>
> iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5fLMQACgkQO4miYXKq
> /Og8KhAArJyvbd8TcCTY1lBbgd+NigqJSOA153fXaP0UDDiAFl8bDBQQ3MziL6uz
> etpFABw5C/B5izX4OTn233M6hZXZaT2+xNd7i3z3aLkXUvmQNZJvsZxWz+feZAuT
> KQDQoSKzgdwuwyjKF2BHfrJCYd+fLjgAFTGTXBjIiJHhOUnM2rkSVuzX0PkZLRR0
> 6f0Otz5s8+BdyLyTfWdXcsF0y5rUl9a4dlOPHqt+LMCtDfJNQkAM204TzDBlisFq
> orJhcTLWPNC/UUKhe5RHGh8BY2XQBThwEIGw3T6VNlERQifZZgHOUJFfJUgN0Jps
> 1ejaTWWA/UETy89Jp8d93hJT3mSh46JIq/ylykiYVtfga2F80WfKBpwVEE/oYdmQ
> CbPWz2ZpMrJAHwYmo/dAayiMIv2Ux36A7NtmVr87r1oeb+euRzC/58p1yGBrNoGv
> xyU/bYbUbvKwoMxwE1yox9fK/bKqTfvJ4x7Ov3um49oG53fkUGcjzEiIfAEPiP5m
> +autPY6Q7oGErPQYPfraVVqfh14N3dx/KWP2gAsFcs1kvv8xlsNREJWcu/rHyLP8
> K8oP+Jwe50qZesyvOco+GL/okkVGkp0DjJnQgKhKrTDuS14A6ZtY9t6/iSyKkgge
> RW332D6Vj+V6B/yzELACRAjJvt6dctQSirvJ+1KsRkdsiVx9VjQ=
> =ZzZl
> -----END PGP SIGNATURE-----
>

Reply via email to