Hi Patrice, Sorry for the delay in getting back to you.
I cloned your repo and ran the example. Shutting down the broker does not stop the streams app, so streams state remains RUNNING and it still has its state store. Hence the SUCCESS status when running curl -XGET http://localhost:4567/healthcheck, As for "the state store, events-counts, may have migrated to another instance", looks like the log was truncated, so we'd need the full stack trace to say what is going on. Thanks, Bill On Wed, Dec 6, 2017 at 1:59 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Patrice, > > Which version of Kafka are you using for this demo app? > > > Guozhang > > On Wed, Dec 6, 2017 at 8:04 AM, Patrice Chalcol <pchal...@gmail.com> > wrote: > > > Hi Bill, > > > > Thanks, I understand. Let me know if you need further information. > > > > Regards, > > Patrice > > > > 2017-12-06 16:03 GMT+01:00 Bill Bejeck <bbej...@gmail.com>: > > > > > Hi Patrice, > > > > > > I haven't forgotten, just sidetracked with other things. I'll get back > > to > > > you by the end of the week. > > > > > > Thanks, > > > Bill > > > > > > On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck <bbej...@gmail.com> > wrote: > > > > > > > Patrice, > > > > > > > > Thanks for reporting this. I'll have a look at what you've posted on > > > > Github. > > > > > > > > Thanks, > > > > Bill > > > > > > > > On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com > > > > > > wrote: > > > > > > > >> Hello, > > > >> > > > >> I have implemented a basic application which uses kafka streams > stores > > > and > > > >> interactive queries, available there : > > > >> https://github.com/pchalcol/kstreams-healthcheck > > > >> > > > >> The healthcheck implementation is based on kafka streams metadata > and > > > the > > > >> stream state, as illustrated below : > > > >> ``` > > > >> String healthcheck() { > > > >> Collection<StreamsMetadata> stores = streams.allMetadata(); > > > >> long storescount = stores.stream() > > > >> .filter(meta -> meta.host().contains("localhost") && meta.port() == > > > 4567) > > > >> .count(); > > > >> > > > >> State state = streams.state(); > > > >> > > > >> System.out.println(String.format("Application State: (%d, %s)", > > > >> storescount, state.toString())); > > > >> > > > >> // KO if current node is down or if is in 'not running' state > > > >> if (storescount == 0 || !state.isRunning()) return "KO"; > > > >> return "OK"; > > > >> } > > > >> ``` > > > >> > > > >> I have created the topics with 4 partitions : > > > >> `kafka-topics --create --topic events --zookeeper localhost:2181 > > > >> --partitions 4 --replication-factor 1` > > > >> `kafka-topics --create --topic library --zookeeper localhost:2181 > > > >> --partitions 4 --replication-factor 1` > > > >> > > > >> What I had expected was the healthcheck returning an error whenever > > the > > > >> broker is shut down, which is not the case. > > > >> > > > >> When I check the application status using the following > > > >> curl -XGET http://localhost:4567/healthcheck > > > >> The server always returns a SUCCESS response, even if the kafka > > cluster > > > is > > > >> down. > > > >> > > > >> You will find below the different tests cases I've done. > > > >> > > > >> 1/ The Stream state is not changed after shutting down the kafka > > cluster > > > >> - start kafka > > > >> `cd docker && docker-compose up -d` > > > >> > > > >> - start producer > > > >> `sbt runMain com.example.streams.Producer` > > > >> > > > >> - start streams and http server > > > >> `sbt runMain com.example.streams.Producer` > > > >> > > > >> - healthcheck > > > >> `curl -XGET http://localhost:4567/healthcheck` > > > >> <http://localhost:4567/healthcheck> > > > >> => response = {"status": "SUCCESS"} > > > >> - shutdown kafka : docker-compose stop > > > >> > > > >> - healthcheck > > > >> `curl -XGET http://localhost:4567/healthcheck` > > > >> <http://localhost:4567/healthcheck> > > > >> => response = {"status": "SUCCESS"} while the expected one should be > > > >> {"status": "ERROR"} > > > >> > > > >> > > > >> 2/ Sometimes, I also encounter this behaviour, no data seems to be > > > >> available when querying the stores > > > >> - Start kafka > > > >> - Start Producer > > > >> - Start Streams and http Server > > > >> > > > >> - Request data : curl -XGET http://localhost:4567/titles > > > >> This http request calls a service which in turn queries the > keyvalue > > > >> store > > > >> => received response > > > >> ``` > > > >> { > > > >> "data": [ > > > >> { > > > >> "key": 1, > > > >> "value": "Fresh Fruit For Rotting Vegetables" > > > >> }, > > > >> > > > >> ... > > > >> > > > >> { > > > >> "key": 10, > > > >> "value": "Fear Of A Black Planet" > > > >> } > > > >> ], > > > >> "status": "SUCCESS" > > > >> } > > > >> ``` > > > >> > > > >> - Request data : curl -XGET http://localhost:4567/titles/counts > > > >> => received response > > > >> ``` > > > >> { > > > >> "data": [ > > > >> { > > > >> "key": "fear of a black planet", > > > >> "value": 414 > > > >> }, > > > >> ... > > > >> { > > > >> "key": "curtain call - the hits", > > > >> "value": 431 > > > >> } > > > >> ], > > > >> "status": "SUCCESS" > > > >> } > > > >> ``` > > > >> > > > >> - shutdown kafka > > > >> > > > >> - Request data : curl -XGET http://localhost:4567/titles > > > >> => received response, same as before, which seems to be ok as we are > > > >> querying the local store > > > >> ``` > > > >> { > > > >> "data": [ > > > >> { > > > >> "key": 1, > > > >> "value": "Fresh Fruit For Rotting Vegetables" > > > >> }, > > > >> > > > >> ... > > > >> > > > >> { > > > >> "key": 10, > > > >> "value": "Fear Of A Black Planet" > > > >> } > > > >> ], > > > >> "status": "SUCCESS" > > > >> } > > > >> ``` > > > >> - Request data : curl -XGET http://localhost:4567/titles/counts > > > >> => received response, still understandable > > > >> ``` > > > >> { > > > >> "data": [ > > > >> { > > > >> "key": "fear of a black planet", > > > >> "value": 414 > > > >> }, > > > >> ... > > > >> { > > > >> "key": "curtain call - the hits", > > > >> "value": 431 > > > >> } > > > >> ], > > > >> "status": "SUCCESS" > > > >> } > > > >> ``` > > > >> > > > >> - restart kafka > > > >> > > > >> - Request data : curl -XGET http://localhost:4567/titles > > > >> => received response > > > >> ``` > > > >> { > > > >> "data": [], > > > >> "status": "SUCCESS" > > > >> } > > > >> ``` > > > >> > > > >> - Request data : curl -XGET http://localhost:4567/titles/counts > > > >> => same here, received response > > > >> ``` > > > >> { > > > >> "data": [], > > > >> "status": "SUCCESS" > > > >> } > > > >> ``` > > > >> > > > >> I also see this entry in the Streams application logs > > > >> ```[error] > > > >> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758- > > > StreamThread-1) > > > >> org.apache.kafka.streams.errors.StreamsException: task [1_0] > > exception > > > >> caught when producing > > > >> org.apache.kafka.streams.errors.StreamsException: task [1_0] > > exception > > > >> caught when producing > > > >> at > > > >> org.apache.kafka.streams.processor.internals.RecordCollector > > > >> Impl.checkForException(RecordCollectorImpl.java:136) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.RecordCollector > > > >> Impl.flush(RecordCollectorImpl.java:144) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamTask. > > > >> flushState(StreamTask.java:283) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamTask$1. > > > >> run(StreamTask.java:264) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > > >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamTask. > > > >> commitImpl(StreamTask.java:259) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamTask. > > > >> commit(StreamTask.java:253) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > > >> commitOne(StreamThread.java:815) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > > >> access$2800(StreamThread.java:73) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamThread$2. > > > >> apply(StreamThread.java:797) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamThread.pe > > > >> rformOnStreamTasks(StreamThread.java:1448) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > > >> commitAll(StreamThread.java:789) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > > >> maybeCommit(StreamThread.java:778) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > > >> runLoop(StreamThread.java:567) > > > >> at > > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > > >> run(StreamThread.java:527) > > > >> Caused by: org.apache.kafka.common.errors.TimeoutException: > Expiring > > 1 > > > >> record(s) for kafka-streams-test-events-counts-repartition-1: 30046 > > ms > > > >> has > > > >> passed since batch creation plus linger time > > > >> [trace] Stack trace suppressed: run last compile:runMain for the > full > > > >> output. > > > >> the state store, events-counts, may have migrated to another > > > instance.``` > > > >> > > > >> Even if a rebalance has occurred after having restarted my cluster, > > as I > > > >> have only one consumer, I thought it should still see all > partitions, > > so > > > >> the store should remain available. > > > >> What am I missing here ? > > > >> > > > >> Thank you for your answers. > > > >> > > > >> -- > > > >> Regards, > > > >> Patrice > > > >> > > > > > > > > > > > > > > > > > > > -- > > Cordialement > > Patrice Chalcol > > > > > > -- > -- Guozhang >