Hi Bill, Thanks, I understand. Let me know if you need further information.
Regards, Patrice 2017-12-06 16:03 GMT+01:00 Bill Bejeck <bbej...@gmail.com>: > Hi Patrice, > > I haven't forgotten, just sidetracked with other things. I'll get back to > you by the end of the week. > > Thanks, > Bill > > On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck <bbej...@gmail.com> wrote: > > > Patrice, > > > > Thanks for reporting this. I'll have a look at what you've posted on > > Github. > > > > Thanks, > > Bill > > > > On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com> > > wrote: > > > >> Hello, > >> > >> I have implemented a basic application which uses kafka streams stores > and > >> interactive queries, available there : > >> https://github.com/pchalcol/kstreams-healthcheck > >> > >> The healthcheck implementation is based on kafka streams metadata and > the > >> stream state, as illustrated below : > >> ``` > >> String healthcheck() { > >> Collection<StreamsMetadata> stores = streams.allMetadata(); > >> long storescount = stores.stream() > >> .filter(meta -> meta.host().contains("localhost") && meta.port() == > 4567) > >> .count(); > >> > >> State state = streams.state(); > >> > >> System.out.println(String.format("Application State: (%d, %s)", > >> storescount, state.toString())); > >> > >> // KO if current node is down or if is in 'not running' state > >> if (storescount == 0 || !state.isRunning()) return "KO"; > >> return "OK"; > >> } > >> ``` > >> > >> I have created the topics with 4 partitions : > >> `kafka-topics --create --topic events --zookeeper localhost:2181 > >> --partitions 4 --replication-factor 1` > >> `kafka-topics --create --topic library --zookeeper localhost:2181 > >> --partitions 4 --replication-factor 1` > >> > >> What I had expected was the healthcheck returning an error whenever the > >> broker is shut down, which is not the case. > >> > >> When I check the application status using the following > >> curl -XGET http://localhost:4567/healthcheck > >> The server always returns a SUCCESS response, even if the kafka cluster > is > >> down. > >> > >> You will find below the different tests cases I've done. > >> > >> 1/ The Stream state is not changed after shutting down the kafka cluster > >> - start kafka > >> `cd docker && docker-compose up -d` > >> > >> - start producer > >> `sbt runMain com.example.streams.Producer` > >> > >> - start streams and http server > >> `sbt runMain com.example.streams.Producer` > >> > >> - healthcheck > >> `curl -XGET http://localhost:4567/healthcheck` > >> <http://localhost:4567/healthcheck> > >> => response = {"status": "SUCCESS"} > >> - shutdown kafka : docker-compose stop > >> > >> - healthcheck > >> `curl -XGET http://localhost:4567/healthcheck` > >> <http://localhost:4567/healthcheck> > >> => response = {"status": "SUCCESS"} while the expected one should be > >> {"status": "ERROR"} > >> > >> > >> 2/ Sometimes, I also encounter this behaviour, no data seems to be > >> available when querying the stores > >> - Start kafka > >> - Start Producer > >> - Start Streams and http Server > >> > >> - Request data : curl -XGET http://localhost:4567/titles > >> This http request calls a service which in turn queries the keyvalue > >> store > >> => received response > >> ``` > >> { > >> "data": [ > >> { > >> "key": 1, > >> "value": "Fresh Fruit For Rotting Vegetables" > >> }, > >> > >> ... > >> > >> { > >> "key": 10, > >> "value": "Fear Of A Black Planet" > >> } > >> ], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> - Request data : curl -XGET http://localhost:4567/titles/counts > >> => received response > >> ``` > >> { > >> "data": [ > >> { > >> "key": "fear of a black planet", > >> "value": 414 > >> }, > >> ... > >> { > >> "key": "curtain call - the hits", > >> "value": 431 > >> } > >> ], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> - shutdown kafka > >> > >> - Request data : curl -XGET http://localhost:4567/titles > >> => received response, same as before, which seems to be ok as we are > >> querying the local store > >> ``` > >> { > >> "data": [ > >> { > >> "key": 1, > >> "value": "Fresh Fruit For Rotting Vegetables" > >> }, > >> > >> ... > >> > >> { > >> "key": 10, > >> "value": "Fear Of A Black Planet" > >> } > >> ], > >> "status": "SUCCESS" > >> } > >> ``` > >> - Request data : curl -XGET http://localhost:4567/titles/counts > >> => received response, still understandable > >> ``` > >> { > >> "data": [ > >> { > >> "key": "fear of a black planet", > >> "value": 414 > >> }, > >> ... > >> { > >> "key": "curtain call - the hits", > >> "value": 431 > >> } > >> ], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> - restart kafka > >> > >> - Request data : curl -XGET http://localhost:4567/titles > >> => received response > >> ``` > >> { > >> "data": [], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> - Request data : curl -XGET http://localhost:4567/titles/counts > >> => same here, received response > >> ``` > >> { > >> "data": [], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> I also see this entry in the Streams application logs > >> ```[error] > >> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758- > StreamThread-1) > >> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception > >> caught when producing > >> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception > >> caught when producing > >> at > >> org.apache.kafka.streams.processor.internals.RecordCollector > >> Impl.checkForException(RecordCollectorImpl.java:136) > >> at > >> org.apache.kafka.streams.processor.internals.RecordCollector > >> Impl.flush(RecordCollectorImpl.java:144) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask. > >> flushState(StreamTask.java:283) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask$1. > >> run(StreamTask.java:264) > >> at > >> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask. > >> commitImpl(StreamTask.java:259) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask. > >> commit(StreamTask.java:253) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> commitOne(StreamThread.java:815) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> access$2800(StreamThread.java:73) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread$2. > >> apply(StreamThread.java:797) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread.pe > >> rformOnStreamTasks(StreamThread.java:1448) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> commitAll(StreamThread.java:789) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> maybeCommit(StreamThread.java:778) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> runLoop(StreamThread.java:567) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> run(StreamThread.java:527) > >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > >> record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms > >> has > >> passed since batch creation plus linger time > >> [trace] Stack trace suppressed: run last compile:runMain for the full > >> output. > >> the state store, events-counts, may have migrated to another > instance.``` > >> > >> Even if a rebalance has occurred after having restarted my cluster, as I > >> have only one consumer, I thought it should still see all partitions, so > >> the store should remain available. > >> What am I missing here ? > >> > >> Thank you for your answers. > >> > >> -- > >> Regards, > >> Patrice > >> > > > > > -- Cordialement Patrice Chalcol