Patrice,

Thanks for reporting this.  I'll have a look at what you've posted on
Github.

Thanks,
Bill

On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com> wrote:

> Hello,
>
> I have implemented a basic application which uses kafka streams stores and
> interactive queries, available there :
> https://github.com/pchalcol/kstreams-healthcheck
>
> The healthcheck implementation is based on kafka streams metadata and the
> stream state, as illustrated below :
> ```
> String healthcheck() {
> Collection<StreamsMetadata> stores = streams.allMetadata();
> long storescount = stores.stream()
> .filter(meta -> meta.host().contains("localhost") && meta.port() == 4567)
> .count();
>
> State state = streams.state();
>
> System.out.println(String.format("Application State: (%d, %s)",
> storescount, state.toString()));
>
> // KO if current node is down or if is in 'not running' state
> if (storescount == 0 || !state.isRunning()) return "KO";
> return "OK";
> }
> ```
>
> I have created the topics with 4 partitions :
> `kafka-topics --create --topic events --zookeeper localhost:2181
> --partitions 4 --replication-factor 1`
> `kafka-topics --create --topic library --zookeeper localhost:2181
> --partitions 4 --replication-factor 1`
>
> What I had expected was the healthcheck returning an error whenever the
> broker is shut down, which is not the case.
>
> When I check the application status using the following
> curl -XGET http://localhost:4567/healthcheck
> The server always returns a SUCCESS response, even if the kafka cluster is
> down.
>
> You will find below the different tests cases I've done.
>
> 1/ The Stream state is not changed after shutting down the kafka cluster
> - start kafka
> `cd docker && docker-compose up -d`
>
> - start producer
> `sbt runMain com.example.streams.Producer`
>
> - start streams and http server
> `sbt runMain com.example.streams.Producer`
>
> - healthcheck
> `curl -XGET http://localhost:4567/healthcheck`
> => response = {"status": "SUCCESS"}
> - shutdown kafka : docker-compose stop
>
> - healthcheck
> `curl -XGET http://localhost:4567/healthcheck`
> => response = {"status": "SUCCESS"} while the expected one should be
> {"status": "ERROR"}
>
>
> 2/ Sometimes, I also encounter this behaviour, no data seems to be
> available when querying the stores
> - Start kafka
> - Start Producer
> - Start Streams and http Server
>
> - Request data : curl -XGET http://localhost:4567/titles
>   This http request calls a service which in turn queries the keyvalue
> store
> => received response
> ```
> {
>     "data": [
>         {
>             "key": 1,
>             "value": "Fresh Fruit For Rotting Vegetables"
>         },
>
>         ...
>
>         {
>             "key": 10,
>             "value": "Fear Of A Black Planet"
>         }
>     ],
>     "status": "SUCCESS"
> }
> ```
>
> - Request data : curl -XGET http://localhost:4567/titles/counts
> => received response
> ```
> {
> "data": [
> {
> "key": "fear of a black planet",
> "value": 414
> },
> ...
> {
> "key": "curtain call - the hits",
> "value": 431
> }
> ],
> "status": "SUCCESS"
> }
> ```
>
> - shutdown kafka
>
> - Request data : curl -XGET http://localhost:4567/titles
> => received response, same as before, which seems to be ok as we are
> querying the local store
> ```
> {
>     "data": [
>         {
>             "key": 1,
>             "value": "Fresh Fruit For Rotting Vegetables"
>         },
>
>         ...
>
>         {
>             "key": 10,
>             "value": "Fear Of A Black Planet"
>         }
>     ],
>     "status": "SUCCESS"
> }
> ```
> - Request data : curl -XGET http://localhost:4567/titles/counts
> => received response, still understandable
> ```
> {
> "data": [
> {
> "key": "fear of a black planet",
> "value": 414
> },
> ...
> {
> "key": "curtain call - the hits",
> "value": 431
> }
> ],
> "status": "SUCCESS"
> }
> ```
>
> - restart kafka
>
> - Request data : curl -XGET http://localhost:4567/titles
> => received response
> ```
> {
>     "data": [],
>     "status": "SUCCESS"
> }
> ```
>
> - Request data : curl -XGET http://localhost:4567/titles/counts
> => same here, received response
> ```
> {
>     "data": [],
>     "status": "SUCCESS"
> }
> ```
>
> I also see this entry in the Streams application logs
> ```[error]
> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-StreamThread-1)
> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
> caught when producing
> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
> caught when producing
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:136)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:144)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:283)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> java:264)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:187)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(
> StreamTask.java:259)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:253)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:815)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$2800(
> StreamThread.java:73)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(
> StreamThread.java:797)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> performOnStreamTasks(StreamThread.java:1448)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:789)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:778)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:567)
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:527)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms has
> passed since batch creation plus linger time
> [trace] Stack trace suppressed: run last compile:runMain for the full
> output.
> the state store, events-counts, may have migrated to another instance.```
>
> Even if a rebalance has occurred after having restarted my cluster, as I
> have only one consumer, I thought it should still see all partitions, so
> the store should remain available.
> What am I missing here ?
>
> Thank you for your answers.
>
> --
> Regards,
> Patrice
>

Reply via email to