Kafka Streams windowed aggregation
Hi, I want to do windowed aggregation with streams library. However, I get the output from particular operator immediately, independent of window size. This makes sense for unlimited windows or sometimes for event time windows. However, for ingestion time or processing time windows, users may want to exact results (and in exact time) of windowed aggregation operator. For example, if I have window of 4 minutes with 2 minutes slide, I would expect to get an output once per 2 minutes. Otherwise I cannot know which one of the outputted tuples from aggregator operator is the "right" that contains aggregation result of whole window. One solution for this, is using queryable state, but pulling states regularly to get latest answers is not useful for my usecase. So, is it on your roadmap to integrate purge/trigger mechanism to windowed aggregates? Thanks Davood
Re: Kafka Streams multi-node
Thanks David and Matthias for reply. To make sure that I understand correctly: - Each stream application is limited to only one node. In that node all stream execution DAG is processed. - If we want to parallelize our application, we start new instance of streams application, which will be similar to previous one (it will run all DAG operators inside of that node) and after adding new application, it gets separate partition to process (the one that no other stream application is processing). - There is no topology to break the DAG and operators (of stream application) into separate nodes (of cluster) and make it look like more "dala flow"ish. Instead we have "little" end-to-end instances running on each cluster node. - Assume we run some aggregate operator with time windows of finance data stream. Then we can have only one partition and only one streams application, as increasing partitions and/or streams applications can cause problems to the logic of particular use case. Please correct me if I am wrong. Thanks Davood On Tue, Jul 26, 2016 at 9:44 PM, Matthias J. Sax <matth...@confluent.io> wrote: > David's answer is correct. Just start the same application multiple > times on different nodes and the library does the rest for you. > > Just one addition: as Kafka Streams is for standard application > development, there is no need to run the application on the same nodes > as your brokers are running (ie, applications instances could run on any > machine outside of your broker cluster). > > Of course, it is possible to run the application on broker nodes. Just > wanted to point out, that there is no co-location required between > brokers and app instances. > > -Matthias > > > On 07/26/2016 08:08 PM, David Garcia wrote: > > > http://docs.confluent.io/3.0.0/streams/architecture.html#parallelism-model > > > > you shouldn’t have to do anything. Simply starting a new thread will > “rebalance” your streaming job. The job coordinates with tasks through > kafka itself. > > > > > > > > On 7/26/16, 12:42 PM, "Davood Rafiei" <rafieidavo...@gmail.com> wrote: > > > > Hi, > > > > I am newbie in Kafka and Kafka-Streams. I read documentation to get > > information how it works in multi-node environment. As a result I > want to > > run streams library on cluster that consists of more than one node. > > From what I understood, I try to resolve the following conflicts: > > - Streams is a standalone java application.So it runs in a single > node, of > > n-node cluster of kafka. > > - However, streams runs on top of kafka, and if we set a > multi-broker kafka > > cluster, and then run streams library from master node, then streams > > library will run in entire cluster. > > > > So, streams library is standalone java application but to force it > to run > > in multiple nodes, do we need to do something extra (in > configuration for > > example) if we have already kafka running in multi-broker mode? > > > > > > Thanks > > Davood > > > > > >
Kafka Streams multi-node
Hi, I am newbie in Kafka and Kafka-Streams. I read documentation to get information how it works in multi-node environment. As a result I want to run streams library on cluster that consists of more than one node. >From what I understood, I try to resolve the following conflicts: - Streams is a standalone java application.So it runs in a single node, of n-node cluster of kafka. - However, streams runs on top of kafka, and if we set a multi-broker kafka cluster, and then run streams library from master node, then streams library will run in entire cluster. So, streams library is standalone java application but to force it to run in multiple nodes, do we need to do something extra (in configuration for example) if we have already kafka running in multi-broker mode? Thanks Davood
Re: Groupby Operator
Thank you for your thorough explanation Michael. It helped a lot. Cheers Davood On Thu, Jun 16, 2016 at 5:01 PM, Michael Noll <mich...@confluent.io> wrote: > Davood, > > you are reading the input topic into a KTable, which means that subsequent > records for the same key (such as the key `1`, which appears twice in the > input messages/records) will be considered as updates to any previous > records for that key. So I think what you actually want to do is read the > input as a KStream instead of a KTable? > > The following code works for me, it looks like what you're trying to do. > Note that I am reading the input data into a KStream, not a KTable. > > Input: > new KeyValue<>(1, "message1"), > new KeyValue<>(1, "message1"), > new KeyValue<>(2, "message2"), > new KeyValue<>(3, "message3"), > new KeyValue<>(4, "message4") > > Streams topology: > > KStream<Integer, String> input = builder.stream(Serdes.Integer(), > Serdes.String(), inputTopic); > KTable<String, Long> counted = input > .map((key, value) -> KeyValue.pair(value, value)) > .countByKey(Serdes.String(), "counted"); > counted.to(Serdes.String(), Serdes.Long(), outputTopic); > > Output: > new KeyValue<>("message1", 1L), > new KeyValue<>("message1", 2L), > new KeyValue<>("message2", 1L), > new KeyValue<>("message3", 1L), > new KeyValue<>("message4", 1L) > > Does that help? > Michael > > > > > On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <rafieidavo...@gmail.com> > wrote: > > > Hi, > > > > > > I am trying to use groupby operator in simple example. However, I get > > strange results. > > > > I have inputs on "test" topic like: (Long, String) > > 1Message_1 > > 1Message_1 > > 2Message_2 > > 3Message_3 > > 4Message_4 > > > > I want to get counts of each value. So: > > Message_1 2 > > Message_1 1 > > Message_2 1 > > Message_3 1 > > Message_4 1 > > > > Because there is not any operator like groupby (fieldIndex), I assume > that > > groupby works always on keys. > > > > So, my program is: > > > > KTable<Long, String> source = builder.table(longSerde, stringSerde, > > "test"); > > KTable<String,Long> counts = source.groupBy(new > KeyValueMapper<Long, > > String, KeyValue<String, String>>() { > > > > @Override > > public KeyValue<String, String> apply(Long key, String value) { > > // TODO Auto-generated method stub > > return KeyValue.pair(value, value); > > } > > },Serdes.String(), Serdes.String()).count("count"); > > counts.print();; > > > > And I get this output as a result: > > > > Message_11 > > Message_10 > > Message_11 > > Message_10 > > Message_21 > > Message_20 > > Message_31 > > Message_30 > > Message_41 > > Message_40 > > > > I couldn't understand this behavior. > > > > > > Cheers > > Davood > > >
Groupby Operator
Hi, I am trying to use groupby operator in simple example. However, I get strange results. I have inputs on "test" topic like: (Long, String) 1Message_1 1Message_1 2Message_2 3Message_3 4Message_4 I want to get counts of each value. So: Message_1 2 Message_1 1 Message_2 1 Message_3 1 Message_4 1 Because there is not any operator like groupby (fieldIndex), I assume that groupby works always on keys. So, my program is: KTablesource = builder.table(longSerde, stringSerde, "test"); KTable counts = source.groupBy(new KeyValueMapper >() { @Override public KeyValue apply(Long key, String value) { // TODO Auto-generated method stub return KeyValue.pair(value, value); } },Serdes.String(), Serdes.String()).count("count"); counts.print();; And I get this output as a result: Message_11 Message_10 Message_11 Message_10 Message_21 Message_20 Message_31 Message_30 Message_41 Message_40 I couldn't understand this behavior. Cheers Davood