-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

You just need to read you stream and apply an (windowed) aggregation
on it.

If you use non-windowed aggregation you will get "since the
beginning". If you use windowed aggregation you can specify the window
size as 1 hour and get those results.

One comment: it seems that you want to count *all* queries. To make
this work, you need to make sure all records are using the same key
(because Kafka Streams only supports aggregation over keyed streams).
Keep in mind, that this prohibits parallelization of you aggregation!

As a workaround, you could also do two consecutive aggregation, and do
parallelize the first one, and do not parallelize the second one (ie,
using the first one as a pre aggregation similar to a combine step)

Without pre aggregation and assuming all records use the same key
something like this (for current trunk):


> KStreamBuilder builder = new KStreamBuilder(): KStream input =
> builder.stream("yourTopic");
> 
> KGroupedStream groupedInput = input.groupByKey();
> 
> groupedInput.count("countStore").to("outputTopicCountFromBeginning");
>
> 
groupedInput.count(TimeWindows.of(3600 * 1000),
"windowedCountStore").to("outputTopicHourlyCounts"):


For more details, please see the docs and examples:

 - http://docs.confluent.io/current/streams/index.html
 -
https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/ka
fka-streams


- -Matthias

On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> Hi,
> 
> I could successfully run Kafka at my environment. I want to monitor
> Queries per Second at my search application with Kafka. Whenever a
> search request is done I create a ProducerRecord which holds
> current nano time of the system.
> 
> I know that I have to use a streaming API for calculation i.e.
> Kafka Streams or Spark Streams. My choice is to use Kafka Streams.
> 
> For last 1 hours, or since the beginning, I have to calculate the
> queries per second. How can I make such an aggregation at Kafka
> Streams?
> 
> Kind Regards, Furkan KAMACI
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYBl6wAAoJECnhiMLycopPVVAP/0EqJJsLnKqvMeIM3XmV7dzP
JnvHJdj0QUn2ONe1Fl9PEDxQvqkw0x/45fBfZsoWqMvIn5uvPfkeF0+TSLFUVUsu
6r+QV8xjJ53GTuPvBQOcUx1H7onXyPkfa88OGVMFV0Er7/1C/p6CAT/MF8x04Fjh
VqT0EQbqVWxoLXdm+GHaUEgdIsJNaXzOzBcxPL9ayA71G4UtwGUud86kjU8CvURJ
wDsZYdWa2TebqG5g80l1YPzRDbNgHKJ4ezHKxdZ+XufizGcoE48BsGzHe09RQDbZ
5aiW+rVXO9dQBIP+3FA3Yeno6+lnGmIECFiHw0FaudOVJIxm40eyTltHjmODMP6T
P55XQKvs6rVwjTp1uxcvrggXtkp+B/Wdglo5RM+MAZ/MkZXc8ruY2G4JYqn3Ko7q
1eEKDpvkbhKGDE9HJGmH0pmYXgSXYhNZPUAURy6pgbpAapysZovJJG1tvIFY2E4R
EpZPHc9JaXOdlOAsK9q468VrCx1pOakC8AZYUAm6vRiSLHGYjiT8sTHQf3IWjP4q
HPCtwk6IZGTGjdLyyMHGm2vbmtiMPBdAN/pau9pehFb5c7Np2uT8WyBL0ECgdOmb
MoxtytRsbuMchZKUo5Wa2wEaBpKwiAnGssW94e3FF898P2tV0br1lLXyrsyNnakN
qOb2YW0mz/+66AJsJw90
=X1XQ
-----END PGP SIGNATURE-----

Reply via email to