-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Hi,
You just need to read you stream and apply an (windowed) aggregation on it. If you use non-windowed aggregation you will get "since the beginning". If you use windowed aggregation you can specify the window size as 1 hour and get those results. One comment: it seems that you want to count *all* queries. To make this work, you need to make sure all records are using the same key (because Kafka Streams only supports aggregation over keyed streams). Keep in mind, that this prohibits parallelization of you aggregation! As a workaround, you could also do two consecutive aggregation, and do parallelize the first one, and do not parallelize the second one (ie, using the first one as a pre aggregation similar to a combine step) Without pre aggregation and assuming all records use the same key something like this (for current trunk): > KStreamBuilder builder = new KStreamBuilder(): KStream input = > builder.stream("yourTopic"); > > KGroupedStream groupedInput = input.groupByKey(); > > groupedInput.count("countStore").to("outputTopicCountFromBeginning"); > > groupedInput.count(TimeWindows.of(3600 * 1000), "windowedCountStore").to("outputTopicHourlyCounts"): For more details, please see the docs and examples: - http://docs.confluent.io/current/streams/index.html - https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/ka fka-streams - -Matthias On 10/18/16 5:00 AM, Furkan KAMACI wrote: > Hi, > > I could successfully run Kafka at my environment. I want to monitor > Queries per Second at my search application with Kafka. Whenever a > search request is done I create a ProducerRecord which holds > current nano time of the system. > > I know that I have to use a streaming API for calculation i.e. > Kafka Streams or Spark Streams. My choice is to use Kafka Streams. > > For last 1 hours, or since the beginning, I have to calculate the > queries per second. How can I make such an aggregation at Kafka > Streams? > > Kind Regards, Furkan KAMACI > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYBl6wAAoJECnhiMLycopPVVAP/0EqJJsLnKqvMeIM3XmV7dzP JnvHJdj0QUn2ONe1Fl9PEDxQvqkw0x/45fBfZsoWqMvIn5uvPfkeF0+TSLFUVUsu 6r+QV8xjJ53GTuPvBQOcUx1H7onXyPkfa88OGVMFV0Er7/1C/p6CAT/MF8x04Fjh VqT0EQbqVWxoLXdm+GHaUEgdIsJNaXzOzBcxPL9ayA71G4UtwGUud86kjU8CvURJ wDsZYdWa2TebqG5g80l1YPzRDbNgHKJ4ezHKxdZ+XufizGcoE48BsGzHe09RQDbZ 5aiW+rVXO9dQBIP+3FA3Yeno6+lnGmIECFiHw0FaudOVJIxm40eyTltHjmODMP6T P55XQKvs6rVwjTp1uxcvrggXtkp+B/Wdglo5RM+MAZ/MkZXc8ruY2G4JYqn3Ko7q 1eEKDpvkbhKGDE9HJGmH0pmYXgSXYhNZPUAURy6pgbpAapysZovJJG1tvIFY2E4R EpZPHc9JaXOdlOAsK9q468VrCx1pOakC8AZYUAm6vRiSLHGYjiT8sTHQf3IWjP4q HPCtwk6IZGTGjdLyyMHGm2vbmtiMPBdAN/pau9pehFb5c7Np2uT8WyBL0ECgdOmb MoxtytRsbuMchZKUo5Wa2wEaBpKwiAnGssW94e3FF898P2tV0br1lLXyrsyNnakN qOb2YW0mz/+66AJsJw90 =X1XQ -----END PGP SIGNATURE-----