Kafka Streams windowed aggregation

2016-10-05 Thread Davood Rafiei
Hi,

I want to do windowed aggregation with streams library. However, I get the
output from particular operator immediately, independent of window size.
This makes sense for unlimited windows or sometimes for event time windows.
However, for ingestion time or processing time windows, users may want to
exact results (and in exact time) of windowed aggregation operator.
For example, if I have window of 4 minutes with 2 minutes slide, I would
expect to get an output once per 2 minutes. Otherwise I cannot know which
one of the outputted tuples from aggregator operator is the "right" that
contains aggregation result of whole window.
One solution for this, is using queryable state, but pulling states
regularly to get latest answers is not useful for my usecase.

So, is it on your roadmap to integrate purge/trigger mechanism to windowed
aggregates?

Thanks
Davood


Re: Kafka Streams multi-node

2016-07-26 Thread Davood Rafiei
Thanks David and Matthias for reply. To make sure that I understand
correctly:
- Each stream application is limited to only one node. In that node all
stream execution DAG is processed.
- If we want to parallelize our application, we start new instance of
streams application, which will be similar to previous one (it will run all
DAG operators inside of that node) and after adding new application,  it
gets separate partition to process (the one that no other stream
application is processing).
- There is no topology to break the DAG and operators (of stream
application) into separate nodes (of cluster) and make it look like more
"dala flow"ish. Instead we have "little" end-to-end instances running on
each cluster node.
- Assume we run some aggregate operator with time windows of finance data
stream. Then we can have only one partition and only one streams
application, as increasing partitions and/or streams applications can cause
problems to the logic of particular use case.

Please correct me if I am wrong.

Thanks
Davood

On Tue, Jul 26, 2016 at 9:44 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> David's answer is correct. Just start the same application multiple
> times on different nodes and the library does the rest for you.
>
> Just one addition: as Kafka Streams is for standard application
> development, there is no need to run the application on the same nodes
> as your brokers are running (ie, applications instances could run on any
> machine outside of your broker cluster).
>
> Of course, it is possible to run the application on broker nodes. Just
> wanted to point out, that there is no co-location required between
> brokers and app instances.
>
> -Matthias
>
>
> On 07/26/2016 08:08 PM, David Garcia wrote:
> >
> http://docs.confluent.io/3.0.0/streams/architecture.html#parallelism-model
> >
> > you shouldn’t have to do anything.  Simply starting a new thread will
> “rebalance” your streaming job.  The job coordinates with tasks through
> kafka itself.
> >
> >
> >
> > On 7/26/16, 12:42 PM, "Davood Rafiei" <rafieidavo...@gmail.com> wrote:
> >
> > Hi,
> >
> > I am newbie in Kafka and Kafka-Streams. I read documentation to get
> > information how it works in multi-node environment. As a result I
> want to
> > run streams library on cluster that consists of more than one node.
> > From what I understood, I try to resolve the following conflicts:
> > - Streams is a standalone java application.So it runs in a single
> node, of
> > n-node cluster of kafka.
> > - However, streams runs on top of kafka, and if we set a
> multi-broker kafka
> > cluster, and then run streams library from master node, then streams
> > library will run in entire cluster.
> >
> > So, streams library is standalone java application but to force it
> to run
> > in multiple nodes, do we need to do something extra (in
> configuration for
> > example) if we have already kafka running in multi-broker mode?
> >
> >
> > Thanks
> > Davood
> >
> >
>
>


Kafka Streams multi-node

2016-07-26 Thread Davood Rafiei
Hi,

I am newbie in Kafka and Kafka-Streams. I read documentation to get
information how it works in multi-node environment. As a result I want to
run streams library on cluster that consists of more than one node.
>From what I understood, I try to resolve the following conflicts:
- Streams is a standalone java application.So it runs in a single node, of
n-node cluster of kafka.
- However, streams runs on top of kafka, and if we set a multi-broker kafka
cluster, and then run streams library from master node, then streams
library will run in entire cluster.

So, streams library is standalone java application but to force it to run
in multiple nodes, do we need to do something extra (in configuration for
example) if we have already kafka running in multi-broker mode?


Thanks
Davood


Re: Groupby Operator

2016-06-17 Thread Davood Rafiei
Thank you for your thorough explanation Michael. It helped a lot.

Cheers
Davood

On Thu, Jun 16, 2016 at 5:01 PM, Michael Noll <mich...@confluent.io> wrote:

> Davood,
>
> you are reading the input topic into a KTable, which means that subsequent
> records for the same key (such as the key `1`, which appears twice in the
> input messages/records) will be considered as updates to any previous
> records for that key.  So I think what you actually want to do is read the
> input as a KStream instead of a KTable?
>
> The following code works for me, it looks like what you're trying to do.
> Note that I am reading the input data into a KStream, not a KTable.
>
> Input:
>   new KeyValue<>(1, "message1"),
>   new KeyValue<>(1, "message1"),
>   new KeyValue<>(2, "message2"),
>   new KeyValue<>(3, "message3"),
>   new KeyValue<>(4, "message4")
>
> Streams topology:
>
>   KStream<Integer, String> input = builder.stream(Serdes.Integer(),
> Serdes.String(), inputTopic);
>   KTable<String, Long> counted = input
>   .map((key, value) -> KeyValue.pair(value, value))
>   .countByKey(Serdes.String(), "counted");
>   counted.to(Serdes.String(), Serdes.Long(), outputTopic);
>
> Output:
>   new KeyValue<>("message1", 1L),
>   new KeyValue<>("message1", 2L),
>   new KeyValue<>("message2", 1L),
>   new KeyValue<>("message3", 1L),
>   new KeyValue<>("message4", 1L)
>
> Does that help?
> Michael
>
>
>
>
> On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <rafieidavo...@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> > I am trying to use groupby operator in simple example. However, I get
> > strange results.
> >
> > I have inputs  on "test" topic like: (Long, String)
> > 1Message_1
> > 1Message_1
> > 2Message_2
> > 3Message_3
> > 4Message_4
> >
> > I want to get counts of each value. So:
> > Message_1 2
> > Message_1 1
> > Message_2 1
> > Message_3 1
> > Message_4 1
> >
> > Because there is not any operator like groupby (fieldIndex), I assume
> that
> > groupby works always on keys.
> >
> > So, my program is:
> >
> >   KTable<Long, String> source = builder.table(longSerde, stringSerde,
> > "test");
> >   KTable<String,Long> counts =  source.groupBy(new
> KeyValueMapper<Long,
> > String, KeyValue<String, String>>() {
> >
> > @Override
> > public KeyValue<String, String> apply(Long key, String value) {
> > // TODO Auto-generated method stub
> >  return  KeyValue.pair(value, value);
> > }
> > },Serdes.String(), Serdes.String()).count("count");
> >   counts.print();;
> >
> > And I get this output as a result:
> >
> > Message_11
> > Message_10
> > Message_11
> > Message_10
> > Message_21
> > Message_20
> > Message_31
> > Message_30
> > Message_41
> > Message_40
> >
> > I couldn't  understand this behavior.
> >
> >
> > Cheers
> > Davood
> >
>


Groupby Operator

2016-06-16 Thread Davood Rafiei
Hi,


I am trying to use groupby operator in simple example. However, I get
strange results.

I have inputs  on "test" topic like: (Long, String)
1Message_1
1Message_1
2Message_2
3Message_3
4Message_4

I want to get counts of each value. So:
Message_1 2
Message_1 1
Message_2 1
Message_3 1
Message_4 1

Because there is not any operator like groupby (fieldIndex), I assume that
groupby works always on keys.

So, my program is:

  KTable source = builder.table(longSerde, stringSerde,
"test");
  KTable counts =  source.groupBy(new KeyValueMapper>() {

@Override
public KeyValue apply(Long key, String value) {
// TODO Auto-generated method stub
 return  KeyValue.pair(value, value);
}
},Serdes.String(), Serdes.String()).count("count");
  counts.print();;

And I get this output as a result:

Message_11
Message_10
Message_11
Message_10
Message_21
Message_20
Message_31
Message_30
Message_41
Message_40

I couldn't  understand this behavior.


Cheers
Davood