Hi Bill,

Thanks, I understand. Let me know if you need further information.

Regards,
Patrice

2017-12-06 16:03 GMT+01:00 Bill Bejeck <bbej...@gmail.com>:

> Hi Patrice,
>
> I haven't forgotten, just sidetracked with other things.  I'll get back to
> you by the end of the week.
>
> Thanks,
> Bill
>
> On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck <bbej...@gmail.com> wrote:
>
> > Patrice,
> >
> > Thanks for reporting this.  I'll have a look at what you've posted on
> > Github.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com>
> > wrote:
> >
> >> Hello,
> >>
> >> I have implemented a basic application which uses kafka streams stores
> and
> >> interactive queries, available there :
> >> https://github.com/pchalcol/kstreams-healthcheck
> >>
> >> The healthcheck implementation is based on kafka streams metadata and
> the
> >> stream state, as illustrated below :
> >> ```
> >> String healthcheck() {
> >> Collection<StreamsMetadata> stores = streams.allMetadata();
> >> long storescount = stores.stream()
> >> .filter(meta -> meta.host().contains("localhost") && meta.port() ==
> 4567)
> >> .count();
> >>
> >> State state = streams.state();
> >>
> >> System.out.println(String.format("Application State: (%d, %s)",
> >> storescount, state.toString()));
> >>
> >> // KO if current node is down or if is in 'not running' state
> >> if (storescount == 0 || !state.isRunning()) return "KO";
> >> return "OK";
> >> }
> >> ```
> >>
> >> I have created the topics with 4 partitions :
> >> `kafka-topics --create --topic events --zookeeper localhost:2181
> >> --partitions 4 --replication-factor 1`
> >> `kafka-topics --create --topic library --zookeeper localhost:2181
> >> --partitions 4 --replication-factor 1`
> >>
> >> What I had expected was the healthcheck returning an error whenever the
> >> broker is shut down, which is not the case.
> >>
> >> When I check the application status using the following
> >> curl -XGET http://localhost:4567/healthcheck
> >> The server always returns a SUCCESS response, even if the kafka cluster
> is
> >> down.
> >>
> >> You will find below the different tests cases I've done.
> >>
> >> 1/ The Stream state is not changed after shutting down the kafka cluster
> >> - start kafka
> >> `cd docker && docker-compose up -d`
> >>
> >> - start producer
> >> `sbt runMain com.example.streams.Producer`
> >>
> >> - start streams and http server
> >> `sbt runMain com.example.streams.Producer`
> >>
> >> - healthcheck
> >> `curl -XGET http://localhost:4567/healthcheck`
> >> <http://localhost:4567/healthcheck>
> >> => response = {"status": "SUCCESS"}
> >> - shutdown kafka : docker-compose stop
> >>
> >> - healthcheck
> >> `curl -XGET http://localhost:4567/healthcheck`
> >> <http://localhost:4567/healthcheck>
> >> => response = {"status": "SUCCESS"} while the expected one should be
> >> {"status": "ERROR"}
> >>
> >>
> >> 2/ Sometimes, I also encounter this behaviour, no data seems to be
> >> available when querying the stores
> >> - Start kafka
> >> - Start Producer
> >> - Start Streams and http Server
> >>
> >> - Request data : curl -XGET http://localhost:4567/titles
> >>   This http request calls a service which in turn queries the keyvalue
> >> store
> >> => received response
> >> ```
> >> {
> >>     "data": [
> >>         {
> >>             "key": 1,
> >>             "value": "Fresh Fruit For Rotting Vegetables"
> >>         },
> >>
> >>         ...
> >>
> >>         {
> >>             "key": 10,
> >>             "value": "Fear Of A Black Planet"
> >>         }
> >>     ],
> >>     "status": "SUCCESS"
> >> }
> >> ```
> >>
> >> - Request data : curl -XGET http://localhost:4567/titles/counts
> >> => received response
> >> ```
> >> {
> >> "data": [
> >> {
> >> "key": "fear of a black planet",
> >> "value": 414
> >> },
> >> ...
> >> {
> >> "key": "curtain call - the hits",
> >> "value": 431
> >> }
> >> ],
> >> "status": "SUCCESS"
> >> }
> >> ```
> >>
> >> - shutdown kafka
> >>
> >> - Request data : curl -XGET http://localhost:4567/titles
> >> => received response, same as before, which seems to be ok as we are
> >> querying the local store
> >> ```
> >> {
> >>     "data": [
> >>         {
> >>             "key": 1,
> >>             "value": "Fresh Fruit For Rotting Vegetables"
> >>         },
> >>
> >>         ...
> >>
> >>         {
> >>             "key": 10,
> >>             "value": "Fear Of A Black Planet"
> >>         }
> >>     ],
> >>     "status": "SUCCESS"
> >> }
> >> ```
> >> - Request data : curl -XGET http://localhost:4567/titles/counts
> >> => received response, still understandable
> >> ```
> >> {
> >> "data": [
> >> {
> >> "key": "fear of a black planet",
> >> "value": 414
> >> },
> >> ...
> >> {
> >> "key": "curtain call - the hits",
> >> "value": 431
> >> }
> >> ],
> >> "status": "SUCCESS"
> >> }
> >> ```
> >>
> >> - restart kafka
> >>
> >> - Request data : curl -XGET http://localhost:4567/titles
> >> => received response
> >> ```
> >> {
> >>     "data": [],
> >>     "status": "SUCCESS"
> >> }
> >> ```
> >>
> >> - Request data : curl -XGET http://localhost:4567/titles/counts
> >> => same here, received response
> >> ```
> >> {
> >>     "data": [],
> >>     "status": "SUCCESS"
> >> }
> >> ```
> >>
> >> I also see this entry in the Streams application logs
> >> ```[error]
> >> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-
> StreamThread-1)
> >> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
> >> caught when producing
> >> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
> >> caught when producing
> >> at
> >> org.apache.kafka.streams.processor.internals.RecordCollector
> >> Impl.checkForException(RecordCollectorImpl.java:136)
> >> at
> >> org.apache.kafka.streams.processor.internals.RecordCollector
> >> Impl.flush(RecordCollectorImpl.java:144)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamTask.
> >> flushState(StreamTask.java:283)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamTask$1.
> >> run(StreamTask.java:264)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamTask.
> >> commitImpl(StreamTask.java:259)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamTask.
> >> commit(StreamTask.java:253)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> commitOne(StreamThread.java:815)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> access$2800(StreamThread.java:73)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$2.
> >> apply(StreamThread.java:797)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.pe
> >> rformOnStreamTasks(StreamThread.java:1448)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> commitAll(StreamThread.java:789)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> maybeCommit(StreamThread.java:778)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> runLoop(StreamThread.java:567)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> run(StreamThread.java:527)
> >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> >> record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms
> >> has
> >> passed since batch creation plus linger time
> >> [trace] Stack trace suppressed: run last compile:runMain for the full
> >> output.
> >> the state store, events-counts, may have migrated to another
> instance.```
> >>
> >> Even if a rebalance has occurred after having restarted my cluster, as I
> >> have only one consumer, I thought it should still see all partitions, so
> >> the store should remain available.
> >> What am I missing here ?
> >>
> >> Thank you for your answers.
> >>
> >> --
> >> Regards,
> >> Patrice
> >>
> >
> >
>



-- 
Cordialement
Patrice Chalcol

Reply via email to