Re: validate identity of producer in each record

2017-03-20 Thread Matt Magoffin
OK, thank you for that. I looked at 
org.apache.kafka.connect.transforms.Transformation and 
org.apache.kafka.connect.source.SourceRecord, but am not discovering where the 
authenticated username (or Principal) might be available for the call to 
Transformation.apply()… or does Connect even support SSL and client certificate 
based authentication in the first place? If not, I think Connect may not be 
quite right for my use case.

— m@

> On 21/03/2017, at 4:59 PM, Hans Jespersen  wrote:
> 
> Nothing on the broker today but if you use Kafka Connect API in 0.10.2 and 
> above there is a pluggable interface called Transformations. 
> 
> See org.apache.kafka.connect.transforms in 
> https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/connect
> 
> Source Connector transformations happen before storage in the Kafka log and 
> Sink Connector transformations happen afterwards for consumers.
> 
> -hans



Re: validate identity of producer in each record

2017-03-20 Thread Hans Jespersen
Nothing on the broker today but if you use Kafka Connect API in 0.10.2 and 
above there is a pluggable interface called Transformations. 

See org.apache.kafka.connect.transforms in 
https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/connect

Source Connector transformations happen before storage in the Kafka log and 
Sink Connector transformations happen afterwards for consumers.

-hans

> On Mar 20, 2017, at 6:52 PM, Matt Magoffin  wrote:
> 
> Thanks, Hans.
> 
> Signing messages is a good idea. Other than that, is there possibly an 
> extension point in Kafka itself on the receiving of records, before they are 
> stored/distributed? I was thinking along the lines of
> 
> org.apache.kafka.clients.producer.ProducerInterceptor
> 
> but on the server side?
> 
> — m@
> 
>> On 21/03/2017, at 12:22 PM, Hans Jespersen  wrote:
>> 
>> You can configure Kafka with ACLs that only allow certain users to
>> produce/consume to certain topics but if multiple producers are allowed to
>> produce to a shared topic then you cannot identify them without adding
>> something to the messages.
>> 
>> For example, you can have each producer digitally sign (or encrypt) each
>> message and include the signature as a separate field (ie. separate from
>> the original message body). Then the consumers can independently check that
>> the signature is valid and that he message comes from a known/valid
>> publisher. This pattern is similar to how signed email messages work.
>> 
>> -hans
> 


Re: validate identity of producer in each record

2017-03-20 Thread Matt Magoffin
Thanks, Hans.

Signing messages is a good idea. Other than that, is there possibly an 
extension point in Kafka itself on the receiving of records, before they are 
stored/distributed? I was thinking along the lines of

org.apache.kafka.clients.producer.ProducerInterceptor

but on the server side?

— m@

> On 21/03/2017, at 12:22 PM, Hans Jespersen  wrote:
> 
> You can configure Kafka with ACLs that only allow certain users to
> produce/consume to certain topics but if multiple producers are allowed to
> produce to a shared topic then you cannot identify them without adding
> something to the messages.
> 
> For example, you can have each producer digitally sign (or encrypt) each
> message and include the signature as a separate field (ie. separate from
> the original message body). Then the consumers can independently check that
> the signature is valid and that he message comes from a known/valid
> publisher. This pattern is similar to how signed email messages work.
> 
> -hans



Re: Kafka Producer for Sending Log Data

2017-03-20 Thread Ram Vittal

Hi Tony,

We are using custom logging API that wraps Kafka producer as singleton for each 
app. The custom API takes structured log data and converts to JSON, writes to 
app specific Kafka topic. Then that topic is bridged to Logstash consumer and 
logs get ingested to elastic search. Now these logs can be searched, filtered 
or visualized. 

Thanks,
Ram Vittal 

Sent from my iPhone

> On Mar 20, 2017, at 10:44 AM, Tony S. Wu  wrote:
> 
> Hi,
> 
> I am looking to send log file periodically to Kafka. Before I set out to
> write my own producer, I was wonder what everyone uses to send log file to
> Kafka through HTTP API. Ideally it should also prune the log and have some
> sort of error handling.
> 
> Thanks very much.
> 
> Tony S. Wu


Re: validate identity of producer in each record

2017-03-20 Thread Matt Magoffin
Thanks, Hans.

Signing messages is a good idea. Other than that, is there possibly an 
extension point in Kafka itself on the receiving of records, before they are 
stored/distributed? I was thinking along the lines of

org.apache.kafka.clients.producer.ProducerInterceptor

but on the server side?

— m@

> On 21/03/2017, at 12:22 PM, Hans Jespersen  wrote:
> 
> You can configure Kafka with ACLs that only allow certain users to
> produce/consume to certain topics but if multiple producers are allowed to
> produce to a shared topic then you cannot identify them without adding
> something to the messages.
> 
> For example, you can have each producer digitally sign (or encrypt) each
> message and include the signature as a separate field (ie. separate from
> the original message body). Then the consumers can independently check that
> the signature is valid and that he message comes from a known/valid
> publisher. This pattern is similar to how signed email messages work.
> 
> -hans



Re: Processing multiple topics

2017-03-20 Thread Matthias J. Sax
I would recommend to try out Kafka's Streams API instead of Spark Streaming.

http://docs.confluent.io/current/streams/index.html

-Matthias


On 3/20/17 11:32 AM, Ali Akhtar wrote:
> Are you saying, that it should process all messages from topic 1, then
> topic 2, then topic 3, then 4?
> 
> Or that they need to be processed exactly at the same time?
> 
> On Mon, Mar 20, 2017 at 10:05 PM, Manasa Danda 
> wrote:
> 
>> Hi,
>>
>> I am Manasa, currently working on a project that requires processing data
>> from multiple topics at the same time. I am looking for an advise on how to
>> approach this problem. Below is the use case.
>>
>>
>> We have 4 topics, with data coming in at a different rate in each topic,
>> but the messages in each topic share a common unique identifier (
>> attributionId). I need to process all the events in the 4 topics with same
>> attributionId at the same time. we are currently using spark streaming for
>> processing.
>>
>> Here's the steps for current logic.
>>
>> 1. Read and filter data in topic 1
>> 2. Read and filter data in topic 2
>> 3. Read and filter data in topic 3
>> 4. Read and filter data in topic 4
>> 5. Union of DStreams from steps 1-4, which were executed in parallel
>> 6. process unified DStream
>>
>> However, since the data is coming at a different rate, the associated data
>> ( topic 1 is generating 1000 times more than topic 2), is not coming in
>> same batch window.
>>
>> Any ideas on how it can implemented would help.
>>
>> Thank you!!
>>
>> -Manasa
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: validate identity of producer in each record

2017-03-20 Thread Hans Jespersen
You can configure Kafka with ACLs that only allow certain users to
produce/consume to certain topics but if multiple producers are allowed to
produce to a shared topic then you cannot identify them without adding
something to the messages.

For example, you can have each producer digitally sign (or encrypt) each
message and include the signature as a separate field (ie. separate from
the original message body). Then the consumers can independently check that
the signature is valid and that he message comes from a known/valid
publisher. This pattern is similar to how signed email messages work.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Mon, Mar 20, 2017 at 3:54 PM, Matt Magoffin  wrote:

> Hello,
>
> I am new to Kafka and am looking for a way for consumers to be able to
> identify the producer of each message in a topic. There are a large number
> of producers (lets say on the order of millions), and each producer would
> be connecting via SSL and using a unique client certificate. Essentially
> I'd like consumers to know the certificate of the producer of any given
> message.
>
> The producer identify of the message must not be forgeable, i.e. producer
> A must not be allowed to generate records that appear to consumers to be
> from producer B.
>
> Is it possible for Kafka to be configured to perform this type of identity
> validation? For example:
>
> * reject records that contain a certificate identity that differs from the
> producer connection’s client certificate
> * inject the producer connection’s certificate identity into each record
>
> Or would a proxy application need to sit in front of Kafka to perform one
> of these functions?
>
> Thank you in advance for offering any advice,
> Matt
>
>


validate identity of producer in each record

2017-03-20 Thread Matt Magoffin
Hello,

I am new to Kafka and am looking for a way for consumers to be able to identify 
the producer of each message in a topic. There are a large number of producers 
(lets say on the order of millions), and each producer would be connecting via 
SSL and using a unique client certificate. Essentially I'd like consumers to 
know the certificate of the producer of any given message.

The producer identify of the message must not be forgeable, i.e. producer A 
must not be allowed to generate records that appear to consumers to be from 
producer B.

Is it possible for Kafka to be configured to perform this type of identity 
validation? For example:

* reject records that contain a certificate identity that differs from the 
producer connection’s client certificate
* inject the producer connection’s certificate identity into each record

Or would a proxy application need to sit in front of Kafka to perform one of 
these functions?

Thank you in advance for offering any advice,
Matt



Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Ali Akhtar
Yeah, windowing seems perfect, if only I could find out the current
window's start time (so I can log the current bucket's start & end times)
and process window messages individually rather than as aggregates.

It doesn't seem like i can get this metadata from ProcessorContext though,
from looking over the javadocs

On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll  wrote:

> Ali,
>
> what you describe is (roughly!) how Kafka Streams implements the internal
> state stores to support windowing.
>
> Some users have been following a similar approach as you outlined, using
> the Processor API.
>
>
>
> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar  wrote:
>
> > It would be helpful to know the 'start' and 'end' of the current
> metadata,
> > so if an out of order message arrives late, and is being processed in
> > foreach(), you'd know which window / bucket it belongs to, and can handle
> > it accordingly.
> >
> > I'm guessing that's not possible at the moment.
> >
> > (My use case is, i receive a stream of messages. Messages need to be
> stored
> > and sorted into 'buckets', to indicate 'sessions'. Each time there's a
> gap
> > of 30 mins or more since the last message (under a key), a new 'session'
> > (bucket) should be started, and future messages should belong to that
> > 'session', until the next 30+ min gap).
> >
> > On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll 
> > wrote:
> >
> > > > Can windows only be used for aggregations, or can they also be used
> for
> > > foreach(),
> > > and such?
> > >
> > > As of today, you can use windows only in aggregations.
> > >
> > > > And is it possible to get metadata on the message, such as whether or
> > > not its
> > > late, its index/position within the other messages, etc?
> > >
> > > If you use the Processor API of Kafka Streams, you can have access to
> an
> > > incoming record's topic, partition, offset, etc. via the so-called
> > > ProcessorContext (which is updated for every new incoming record):
> > >
> > > http://docs.confluent.io/current/streams/javadocs/org/
> > > apache/kafka/streams/processor/Processor.html
> > > - You can get/store a reference to the ProcessorContext from
> > > `Processor#init()`.
> > >
> > > http://docs.confluent.io/current/streams/javadocs/org/
> > > apache/kafka/streams/processor/ProcessorContext.html
> > > - The context can then be used within `Processor#process()` when you
> > > process a new record.  As I said, the context is updated behind the
> > scenes
> > > to match the record that is currently being processed.
> > >
> > >
> > > Best,
> > > Michael
> > >
> > >
> > >
> > >
> > > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar 
> > wrote:
> > >
> > > > Can windows only be used for aggregations, or can they also be used
> for
> > > > foreach(), and such?
> > > >
> > > > And is it possible to get metadata on the message, such as whether or
> > not
> > > > its late, its index/position within the other messages, etc?
> > > >
> > > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
> > > > wrote:
> > > >
> > > > > And since you asked for a pointer, Ali:
> > > > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > > > >
> > > > >
> > > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> mich...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Late-arriving and out-of-order data is only treated specially for
> > > > > windowed
> > > > > > aggregations.
> > > > > >
> > > > > > For stateless operations such as `KStream#foreach()` or
> > > > `KStream#map()`,
> > > > > > records are processed in the order they arrive (per partition).
> > > > > >
> > > > > > -Michael
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> ali.rac...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> > later when message A arrives it will put that message back
> into
> > > > > >> > the right temporal context and publish an amended result for
> the
> > > > > proper
> > > > > >> > time/session window as if message B were consumed in the
> > timestamp
> > > > > order
> > > > > >> > before message A.
> > > > > >>
> > > > > >> Does this apply to the aggregation Kafka stream methods then,
> and
> > > not
> > > > to
> > > > > >> e.g foreach?
> > > > > >>
> > > > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> > h...@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Yes stream processing and CEP are subtlety different things.
> > > > > >> >
> > > > > >> > Kafka Streams helps you write stateful apps and allows that
> > state
> > > to
> > > > > be
> > > > > >> > preserved on disk (a local State store) as well as distributed
> > for
> > > > HA
> > > > > or
> > > > > >> > for parallel partitioned processing (via Kafka topic
> partitions
> > > and
> > > > > >> > consumer groups) as well as in memory (as a performance
> > > > enhancement).
> > > > > >> >
> > > > > >> > 

Re: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-20 Thread Michael Noll
Hmm, I must admit I don't like this last update all too much.

Basically we would have:

StreamsBuilder builder = new StreamsBuilder();

// And here you'd define your...well, what actually?
// Ah right, you are composing a topology here, though you are not
aware of it.

KafkaStreams streams = new KafkaStreams(builder.build(),
streamsConfiguration);

So what are you building here with StreamsBuilder?  Streams (hint: No)?
And what about tables -- is there a TableBuilder (hint: No)?

I also interpret Guozhang's last response as that he'd prefer to have
"Topology" in the class/interface names.  I am aware that we shouldn't
necessarily use the status quo to make decisions about future changes, but
the very first concept we explain in the Kafka Streams documentation is
"Stream Processing Topology":
https://kafka.apache.org/0102/documentation/streams#streams_concepts

-Michael



On Mon, Mar 20, 2017 at 7:55 PM, Matthias J. Sax 
wrote:

> \cc users list
>
>
>  Forwarded Message 
> Subject: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API
> Date: Mon, 20 Mar 2017 11:51:01 -0700
> From: Matthias J. Sax 
> Organization: Confluent Inc
> To: d...@kafka.apache.org
>
> I want to push this discussion further.
>
> Guozhang's argument about "exposing" the Topology class is valid. It's a
> public class anyway, so it's not as issue. However, I think the question
> is not too much about exposing but about "advertising" (ie, putting it
> into the focus) or not at DSL level.
>
>
> If I interpret the last replies correctly, it seems that we could agree
> on "StreamsBuilder" as name. I did update the KIP accordingly. Please
> correct me, if I got this wrong.
>
>
> If there are not other objects -- this naming discussion was the last
> open point to far -- I would like the start the VOTE thread.
>
>
> -Matthias
>
>
> On 3/14/17 2:37 PM, Guozhang Wang wrote:
> > I'd like to keep the term "Topology" inside the builder class since, as
> > Matthias mentioned, this builder#build() function returns a "Topology"
> > object, whose type is a public class anyways. Although you can argue to
> let
> > users always call
> >
> > "new KafkaStreams(builder.build())"
> >
> > I think it is still more benefit to expose this concept.
> >
> >
> >
> > Guozhang
> >
> > On Tue, Mar 14, 2017 at 10:43 AM, Matthias J. Sax  >
> > wrote:
> >
> >> Thanks for your input Michael.
> >>
>  - KafkaStreams as the new name for the builder that creates the
> logical
>  plan, with e.g. `KafkaStreams.stream("intput-topic")` and
>  `KafkaStreams.table("input-topic")`.
> >>
> >> I don't thinks this is a good idea, for multiple reasons:
> >>
> >> (1) We would reuse a name for a completely different purpose. The same
> >> argument for not renaming KStreamBuilder to TopologyBuilder. The
> >> confusion would just be too large.
> >>
> >> So if we would start from scratch, it might be ok to do so, but now we
> >> cannot make this move, IMHO.
> >>
> >> Also a clarification question: do you suggest to have static methods
> >> #stream and #table -- I am not sure if this would work?
> >> (or was you code snippet just simplification?)
> >>
> >>
> >> (2) Kafka Streams is basically a "processing client" next to consumer
> >> and producer client. Thus, the name KafkaStreams aligns to the naming
> >> schema of KafkaConsumer and KafkaProducer. I am not sure if it would be
> >> a good choice to "break" this naming scheme.
> >>
> >> Btw: this is also the reason, why we have KafkaStreams#close() -- and
> >> not KafkaStreams#stop() -- because #close() aligns with consumer and
> >> producer client.
> >>
> >>
> >> (3) On more argument against using KafkaStreams as DSL entry class would
> >> be, that it would need to create a Topology that can be given to the
> >> "runner/processing-client". Thus the pattern would be
> >>
> >>> Topology topology = streams.build();
> >>> KafkaStramsRunner runner = new KafkaStreamsRunner(..., topology)
> >>
> >> (or of course as a one liner).
> >>
> >>
> >>
> >> On the other hand, there was the idea (that we intentionally excluded
> >> from the KIP), to change the "client instantiation" pattern.
> >>
> >> Right now, a new client in actively instantiated (ie, by calling "new")
> >> and the topology if provided as a constructor argument. However,
> >> especially for DSL (not sure if it would make sense for PAPI), the DSL
> >> builder could create the client for the user.
> >>
> >> Something like this:
> >>
> >>> KStreamBuilder builder = new KStreamBuilder();
> >>> builder.whatever() // use the builder
> >>>
> >>> StreamsConfig config = 
> >>> KafkaStreams streams = builder.getKafkaStreams(config);
> >>
> >> If we change the patter like this, the notion a the "DSL builder" would
> >> change, as it does not create a topology anymore, but it creates the
> >> "processing client". This would address Jay's concern about "not
> >> exposing concept 

Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Michael Noll
Ali,

what you describe is (roughly!) how Kafka Streams implements the internal
state stores to support windowing.

Some users have been following a similar approach as you outlined, using
the Processor API.



On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar  wrote:

> It would be helpful to know the 'start' and 'end' of the current metadata,
> so if an out of order message arrives late, and is being processed in
> foreach(), you'd know which window / bucket it belongs to, and can handle
> it accordingly.
>
> I'm guessing that's not possible at the moment.
>
> (My use case is, i receive a stream of messages. Messages need to be stored
> and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap
> of 30 mins or more since the last message (under a key), a new 'session'
> (bucket) should be started, and future messages should belong to that
> 'session', until the next 30+ min gap).
>
> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll 
> wrote:
>
> > > Can windows only be used for aggregations, or can they also be used for
> > foreach(),
> > and such?
> >
> > As of today, you can use windows only in aggregations.
> >
> > > And is it possible to get metadata on the message, such as whether or
> > not its
> > late, its index/position within the other messages, etc?
> >
> > If you use the Processor API of Kafka Streams, you can have access to an
> > incoming record's topic, partition, offset, etc. via the so-called
> > ProcessorContext (which is updated for every new incoming record):
> >
> > http://docs.confluent.io/current/streams/javadocs/org/
> > apache/kafka/streams/processor/Processor.html
> > - You can get/store a reference to the ProcessorContext from
> > `Processor#init()`.
> >
> > http://docs.confluent.io/current/streams/javadocs/org/
> > apache/kafka/streams/processor/ProcessorContext.html
> > - The context can then be used within `Processor#process()` when you
> > process a new record.  As I said, the context is updated behind the
> scenes
> > to match the record that is currently being processed.
> >
> >
> > Best,
> > Michael
> >
> >
> >
> >
> > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar 
> wrote:
> >
> > > Can windows only be used for aggregations, or can they also be used for
> > > foreach(), and such?
> > >
> > > And is it possible to get metadata on the message, such as whether or
> not
> > > its late, its index/position within the other messages, etc?
> > >
> > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
> > > wrote:
> > >
> > > > And since you asked for a pointer, Ali:
> > > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > > >
> > > >
> > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll 
> > > > wrote:
> > > >
> > > > > Late-arriving and out-of-order data is only treated specially for
> > > > windowed
> > > > > aggregations.
> > > > >
> > > > > For stateless operations such as `KStream#foreach()` or
> > > `KStream#map()`,
> > > > > records are processed in the order they arrive (per partition).
> > > > >
> > > > > -Michael
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar  >
> > > > wrote:
> > > > >
> > > > >> > later when message A arrives it will put that message back into
> > > > >> > the right temporal context and publish an amended result for the
> > > > proper
> > > > >> > time/session window as if message B were consumed in the
> timestamp
> > > > order
> > > > >> > before message A.
> > > > >>
> > > > >> Does this apply to the aggregation Kafka stream methods then, and
> > not
> > > to
> > > > >> e.g foreach?
> > > > >>
> > > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> h...@confluent.io>
> > > > >> wrote:
> > > > >>
> > > > >> > Yes stream processing and CEP are subtlety different things.
> > > > >> >
> > > > >> > Kafka Streams helps you write stateful apps and allows that
> state
> > to
> > > > be
> > > > >> > preserved on disk (a local State store) as well as distributed
> for
> > > HA
> > > > or
> > > > >> > for parallel partitioned processing (via Kafka topic partitions
> > and
> > > > >> > consumer groups) as well as in memory (as a performance
> > > enhancement).
> > > > >> >
> > > > >> > However a classical CEP engine with a pre-modeled state machine
> > and
> > > > >> > pattern matching rules is something different from stream
> > > processing.
> > > > >> >
> > > > >> > It is on course possible to build a CEP system on top on Kafka
> > > Streams
> > > > >> and
> > > > >> > get the best of both worlds.
> > > > >> >
> > > > >> > -hans
> > > > >> >
> > > > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > > > >> > sabarish@gmail.com> wrote:
> > > > >> > >
> > > > >> > > Hans
> > > > >> > >
> > > > >> > > What you state would work for aggregations, but not for state
> > > > machines
> > > > >> > and
> > > > >> > > CEP.
> > > > >> > >
> > 

Fwd: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-20 Thread Matthias J. Sax
\cc users list


 Forwarded Message 
Subject: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API
Date: Mon, 20 Mar 2017 11:51:01 -0700
From: Matthias J. Sax 
Organization: Confluent Inc
To: d...@kafka.apache.org

I want to push this discussion further.

Guozhang's argument about "exposing" the Topology class is valid. It's a
public class anyway, so it's not as issue. However, I think the question
is not too much about exposing but about "advertising" (ie, putting it
into the focus) or not at DSL level.


If I interpret the last replies correctly, it seems that we could agree
on "StreamsBuilder" as name. I did update the KIP accordingly. Please
correct me, if I got this wrong.


If there are not other objects -- this naming discussion was the last
open point to far -- I would like the start the VOTE thread.


-Matthias


On 3/14/17 2:37 PM, Guozhang Wang wrote:
> I'd like to keep the term "Topology" inside the builder class since, as
> Matthias mentioned, this builder#build() function returns a "Topology"
> object, whose type is a public class anyways. Although you can argue to let
> users always call
> 
> "new KafkaStreams(builder.build())"
> 
> I think it is still more benefit to expose this concept.
> 
> 
> 
> Guozhang
> 
> On Tue, Mar 14, 2017 at 10:43 AM, Matthias J. Sax 
> wrote:
> 
>> Thanks for your input Michael.
>>
 - KafkaStreams as the new name for the builder that creates the logical
 plan, with e.g. `KafkaStreams.stream("intput-topic")` and
 `KafkaStreams.table("input-topic")`.
>>
>> I don't thinks this is a good idea, for multiple reasons:
>>
>> (1) We would reuse a name for a completely different purpose. The same
>> argument for not renaming KStreamBuilder to TopologyBuilder. The
>> confusion would just be too large.
>>
>> So if we would start from scratch, it might be ok to do so, but now we
>> cannot make this move, IMHO.
>>
>> Also a clarification question: do you suggest to have static methods
>> #stream and #table -- I am not sure if this would work?
>> (or was you code snippet just simplification?)
>>
>>
>> (2) Kafka Streams is basically a "processing client" next to consumer
>> and producer client. Thus, the name KafkaStreams aligns to the naming
>> schema of KafkaConsumer and KafkaProducer. I am not sure if it would be
>> a good choice to "break" this naming scheme.
>>
>> Btw: this is also the reason, why we have KafkaStreams#close() -- and
>> not KafkaStreams#stop() -- because #close() aligns with consumer and
>> producer client.
>>
>>
>> (3) On more argument against using KafkaStreams as DSL entry class would
>> be, that it would need to create a Topology that can be given to the
>> "runner/processing-client". Thus the pattern would be
>>
>>> Topology topology = streams.build();
>>> KafkaStramsRunner runner = new KafkaStreamsRunner(..., topology)
>>
>> (or of course as a one liner).
>>
>>
>>
>> On the other hand, there was the idea (that we intentionally excluded
>> from the KIP), to change the "client instantiation" pattern.
>>
>> Right now, a new client in actively instantiated (ie, by calling "new")
>> and the topology if provided as a constructor argument. However,
>> especially for DSL (not sure if it would make sense for PAPI), the DSL
>> builder could create the client for the user.
>>
>> Something like this:
>>
>>> KStreamBuilder builder = new KStreamBuilder();
>>> builder.whatever() // use the builder
>>>
>>> StreamsConfig config = 
>>> KafkaStreams streams = builder.getKafkaStreams(config);
>>
>> If we change the patter like this, the notion a the "DSL builder" would
>> change, as it does not create a topology anymore, but it creates the
>> "processing client". This would address Jay's concern about "not
>> exposing concept users don't need the understand" and would not require
>> to include the word "Topology" in the DSL builder class name, because
>> the builder does not build a Topology anymore.
>>
>> I just put some names that came to my mind first hand -- did not think
>> about good names. It's just to discuss the pattern.
>>
>>
>>
>> -Matthias
>>
>>
>>
>>
>>
>> On 3/14/17 3:36 AM, Michael Noll wrote:
>>> I see Jay's point, and I agree with much of it -- notably about being
>>> careful which concepts we do and do not expose, depending on which user
>>> group / user type is affected.  That said, I'm not sure yet whether or
>> not
>>> we should get rid of "Topology" (or a similar term) in the DSL.
>>>
>>> For what it's worth, here's how related technologies define/name their
>>> "topologies" and "builders".  Note that, in all cases, it's about
>>> constructing a logical processing plan, which then is being executed/run.
>>>
>>> - `Pipeline` (Google Dataflow/Apache Beam)
>>> - To add a source you first instantiate the Source (e.g.
>>> `TextIO.Read.from("gs://some/inputData.txt")`),
>>>   then attach it to your processing plan via
>> `Pipeline#apply()`.
>>>   This setup is a 

Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Ali Akhtar
It would be helpful to know the 'start' and 'end' of the current metadata,
so if an out of order message arrives late, and is being processed in
foreach(), you'd know which window / bucket it belongs to, and can handle
it accordingly.

I'm guessing that's not possible at the moment.

(My use case is, i receive a stream of messages. Messages need to be stored
and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap
of 30 mins or more since the last message (under a key), a new 'session'
(bucket) should be started, and future messages should belong to that
'session', until the next 30+ min gap).

On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll  wrote:

> > Can windows only be used for aggregations, or can they also be used for
> foreach(),
> and such?
>
> As of today, you can use windows only in aggregations.
>
> > And is it possible to get metadata on the message, such as whether or
> not its
> late, its index/position within the other messages, etc?
>
> If you use the Processor API of Kafka Streams, you can have access to an
> incoming record's topic, partition, offset, etc. via the so-called
> ProcessorContext (which is updated for every new incoming record):
>
> http://docs.confluent.io/current/streams/javadocs/org/
> apache/kafka/streams/processor/Processor.html
> - You can get/store a reference to the ProcessorContext from
> `Processor#init()`.
>
> http://docs.confluent.io/current/streams/javadocs/org/
> apache/kafka/streams/processor/ProcessorContext.html
> - The context can then be used within `Processor#process()` when you
> process a new record.  As I said, the context is updated behind the scenes
> to match the record that is currently being processed.
>
>
> Best,
> Michael
>
>
>
>
> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar  wrote:
>
> > Can windows only be used for aggregations, or can they also be used for
> > foreach(), and such?
> >
> > And is it possible to get metadata on the message, such as whether or not
> > its late, its index/position within the other messages, etc?
> >
> > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
> > wrote:
> >
> > > And since you asked for a pointer, Ali:
> > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > >
> > >
> > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll 
> > > wrote:
> > >
> > > > Late-arriving and out-of-order data is only treated specially for
> > > windowed
> > > > aggregations.
> > > >
> > > > For stateless operations such as `KStream#foreach()` or
> > `KStream#map()`,
> > > > records are processed in the order they arrive (per partition).
> > > >
> > > > -Michael
> > > >
> > > >
> > > >
> > > >
> > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar 
> > > wrote:
> > > >
> > > >> > later when message A arrives it will put that message back into
> > > >> > the right temporal context and publish an amended result for the
> > > proper
> > > >> > time/session window as if message B were consumed in the timestamp
> > > order
> > > >> > before message A.
> > > >>
> > > >> Does this apply to the aggregation Kafka stream methods then, and
> not
> > to
> > > >> e.g foreach?
> > > >>
> > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen 
> > > >> wrote:
> > > >>
> > > >> > Yes stream processing and CEP are subtlety different things.
> > > >> >
> > > >> > Kafka Streams helps you write stateful apps and allows that state
> to
> > > be
> > > >> > preserved on disk (a local State store) as well as distributed for
> > HA
> > > or
> > > >> > for parallel partitioned processing (via Kafka topic partitions
> and
> > > >> > consumer groups) as well as in memory (as a performance
> > enhancement).
> > > >> >
> > > >> > However a classical CEP engine with a pre-modeled state machine
> and
> > > >> > pattern matching rules is something different from stream
> > processing.
> > > >> >
> > > >> > It is on course possible to build a CEP system on top on Kafka
> > Streams
> > > >> and
> > > >> > get the best of both worlds.
> > > >> >
> > > >> > -hans
> > > >> >
> > > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > > >> > sabarish@gmail.com> wrote:
> > > >> > >
> > > >> > > Hans
> > > >> > >
> > > >> > > What you state would work for aggregations, but not for state
> > > machines
> > > >> > and
> > > >> > > CEP.
> > > >> > >
> > > >> > > Regards
> > > >> > > Sab
> > > >> > >
> > > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen"  >
> > > >> wrote:
> > > >> > >>
> > > >> > >> The only way to make sure A is consumed first would be to delay
> > the
> > > >> > >> consumption of message B for at least 15 minutes which would
> fly
> > in
> > > >> the
> > > >> > >> face of the principals of a true streaming platform so the
> short
> > > >> answer
> > > >> > to
> > > >> > >> your question is "no" because that would be batch processing
> not
> > > >> stream
> > > >> > 

Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Michael Noll
> Can windows only be used for aggregations, or can they also be used for 
> foreach(),
and such?

As of today, you can use windows only in aggregations.

> And is it possible to get metadata on the message, such as whether or not its
late, its index/position within the other messages, etc?

If you use the Processor API of Kafka Streams, you can have access to an
incoming record's topic, partition, offset, etc. via the so-called
ProcessorContext (which is updated for every new incoming record):

http://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/processor/Processor.html
- You can get/store a reference to the ProcessorContext from
`Processor#init()`.

http://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/processor/ProcessorContext.html
- The context can then be used within `Processor#process()` when you
process a new record.  As I said, the context is updated behind the scenes
to match the record that is currently being processed.


Best,
Michael




On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar  wrote:

> Can windows only be used for aggregations, or can they also be used for
> foreach(), and such?
>
> And is it possible to get metadata on the message, such as whether or not
> its late, its index/position within the other messages, etc?
>
> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
> wrote:
>
> > And since you asked for a pointer, Ali:
> > http://docs.confluent.io/current/streams/concepts.html#windowing
> >
> >
> > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll 
> > wrote:
> >
> > > Late-arriving and out-of-order data is only treated specially for
> > windowed
> > > aggregations.
> > >
> > > For stateless operations such as `KStream#foreach()` or
> `KStream#map()`,
> > > records are processed in the order they arrive (per partition).
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar 
> > wrote:
> > >
> > >> > later when message A arrives it will put that message back into
> > >> > the right temporal context and publish an amended result for the
> > proper
> > >> > time/session window as if message B were consumed in the timestamp
> > order
> > >> > before message A.
> > >>
> > >> Does this apply to the aggregation Kafka stream methods then, and not
> to
> > >> e.g foreach?
> > >>
> > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen 
> > >> wrote:
> > >>
> > >> > Yes stream processing and CEP are subtlety different things.
> > >> >
> > >> > Kafka Streams helps you write stateful apps and allows that state to
> > be
> > >> > preserved on disk (a local State store) as well as distributed for
> HA
> > or
> > >> > for parallel partitioned processing (via Kafka topic partitions and
> > >> > consumer groups) as well as in memory (as a performance
> enhancement).
> > >> >
> > >> > However a classical CEP engine with a pre-modeled state machine and
> > >> > pattern matching rules is something different from stream
> processing.
> > >> >
> > >> > It is on course possible to build a CEP system on top on Kafka
> Streams
> > >> and
> > >> > get the best of both worlds.
> > >> >
> > >> > -hans
> > >> >
> > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > >> > sabarish@gmail.com> wrote:
> > >> > >
> > >> > > Hans
> > >> > >
> > >> > > What you state would work for aggregations, but not for state
> > machines
> > >> > and
> > >> > > CEP.
> > >> > >
> > >> > > Regards
> > >> > > Sab
> > >> > >
> > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" 
> > >> wrote:
> > >> > >>
> > >> > >> The only way to make sure A is consumed first would be to delay
> the
> > >> > >> consumption of message B for at least 15 minutes which would fly
> in
> > >> the
> > >> > >> face of the principals of a true streaming platform so the short
> > >> answer
> > >> > to
> > >> > >> your question is "no" because that would be batch processing not
> > >> stream
> > >> > >> processing.
> > >> > >>
> > >> > >> However, Kafka Streams does handle late arriving data. So if you
> > had
> > >> > some
> > >> > >> analytics that computes results on a time window or a session
> > window
> > >> > then
> > >> > >> Kafka streams will compute on the stream in real time (processing
> > >> > message
> > >> > >> B) and then later when message A arrives it will put that message
> > >> back
> > >> > into
> > >> > >> the right temporal context and publish an amended result for the
> > >> proper
> > >> > >> time/session window as if message B were consumed in the
> timestamp
> > >> order
> > >> > >> before message A. The end result of this flow is that you
> > eventually
> > >> get
> > >> > >> the same results you would get in a batch processing system but
> > with
> > >> the
> > >> > >> added benefit of getting intermediary result at much lower
> latency.
> > >> > >>
> > >> > >> -hans
> > >> > >>
> > >> > >> /**
> > >> > >> * Hans 

Re: Processing multiple topics

2017-03-20 Thread Ali Akhtar
Are you saying, that it should process all messages from topic 1, then
topic 2, then topic 3, then 4?

Or that they need to be processed exactly at the same time?

On Mon, Mar 20, 2017 at 10:05 PM, Manasa Danda 
wrote:

> Hi,
>
> I am Manasa, currently working on a project that requires processing data
> from multiple topics at the same time. I am looking for an advise on how to
> approach this problem. Below is the use case.
>
>
> We have 4 topics, with data coming in at a different rate in each topic,
> but the messages in each topic share a common unique identifier (
> attributionId). I need to process all the events in the 4 topics with same
> attributionId at the same time. we are currently using spark streaming for
> processing.
>
> Here's the steps for current logic.
>
> 1. Read and filter data in topic 1
> 2. Read and filter data in topic 2
> 3. Read and filter data in topic 3
> 4. Read and filter data in topic 4
> 5. Union of DStreams from steps 1-4, which were executed in parallel
> 6. process unified DStream
>
> However, since the data is coming at a different rate, the associated data
> ( topic 1 is generating 1000 times more than topic 2), is not coming in
> same batch window.
>
> Any ideas on how it can implemented would help.
>
> Thank you!!
>
> -Manasa
>


Processing multiple topics

2017-03-20 Thread Manasa Danda
Hi,

I am Manasa, currently working on a project that requires processing data
from multiple topics at the same time. I am looking for an advise on how to
approach this problem. Below is the use case.


We have 4 topics, with data coming in at a different rate in each topic,
but the messages in each topic share a common unique identifier (
attributionId). I need to process all the events in the 4 topics with same
attributionId at the same time. we are currently using spark streaming for
processing.

Here's the steps for current logic.

1. Read and filter data in topic 1
2. Read and filter data in topic 2
3. Read and filter data in topic 3
4. Read and filter data in topic 4
5. Union of DStreams from steps 1-4, which were executed in parallel
6. process unified DStream

However, since the data is coming at a different rate, the associated data
( topic 1 is generating 1000 times more than topic 2), is not coming in
same batch window.

Any ideas on how it can implemented would help.

Thank you!!

-Manasa


org.apache.kafka.common.errors.TimeoutException

2017-03-20 Thread Mina Aslani
Hi,

I get ERROR Error when sending message to topic my-topic with key: null,
value: ... bytes with error: (org.apache.kafka.clients.producer.internals.
ErrorLoggingCallback)

org.apache.kafka.common.errors.TimeoutException: Expiring 11 record(s) for
my-topic-0: 1732 ms has passed since last append

Any idea?

Best regards,
Mina


Kafka Producer for Sending Log Data

2017-03-20 Thread Tony S. Wu
Hi,

I am looking to send log file periodically to Kafka. Before I set out to
write my own producer, I was wonder what everyone uses to send log file to
Kafka through HTTP API. Ideally it should also prune the log and have some
sort of error handling.

Thanks very much.

Tony S. Wu


Kafka timing to auto create topics

2017-03-20 Thread Matheus Gonçalves da Silva
Hello,
I have configured on my server.properties the auto.create.topics.enable=true 
but im having issues with that.

When I launch kafka for the first time and send any message to any topic (to be 
created) it doesn't arrive at the topic, but if I launch kafka and wait few 
minutes to send the message to these topics that are going to be created, it 
happens, the topic is created and the message gets there.

This "timing" is configurable?


Re: ConsumerRebalanceListerner

2017-03-20 Thread Robert Quinlivan
Does the consumer or producer log anything when you connect the new
consumer? Is there anything logged in the broker logs?

On Mon, Mar 20, 2017 at 10:58 AM, jeffrey venus 
wrote:

> Hi,tarte
>   I am trying to use kafka as a messaging platform for my microservices.
> There is a problem i am facing in real time . when I try to bring in a new
> consumer group to consume from a certain topic . I have to restart the
> producer only then it ( new consumer group) starts consuming. Is there any
> other away without disturbing the producer.
>
>
> Regards
> V G Sunjay Jeffrish
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: clearing an aggregation?

2017-03-20 Thread Michael Noll
Jon,

the windowing operation of Kafka's Streams API (in its DSL) aligns
time-based windows to the epoch [1]:

Quoting from e.g. hopping windows (sometimes called sliding windows in
other technologies):

> Hopping time windows are aligned to the epoch, with the lower interval
bound
> being inclusive and the upper bound being exclusive. “Aligned to the
epoch”
> means that the first window starts at timestamp zero.
> For example, hopping windows with a size of 5000ms and an advance interval
> (“hop”) of 3000ms have predictable window boundaries
`[0;5000),[3000;8000),...`
> — and not `[1000;6000),[4000;9000),...` or even something “random” like
> `[1452;6452),[4452;9452),...`.

Would that help you?

-Michael



[1] http://docs.confluent.io/current/streams/developer-guide.html


On Mon, Mar 20, 2017 at 12:51 PM, Jon Yeargers 
wrote:

> Is this possible? Im wondering about gathering data from a stream into a
> series of windowed aggregators: minute, hour and day. A separate process
> would start at fixed intervals, query the appropriate state store for
> available values and then hopefully clear / zero / reset everything for the
> next interval.
>
> I could use the retention period setting but I would (somehow) need to
> guarantee that the windows would reset on clock boundaries and not based on
> start time for the app.
>


ConsumerRebalanceListerner

2017-03-20 Thread jeffrey venus
Hi,tarte
  I am trying to use kafka as a messaging platform for my microservices.
There is a problem i am facing in real time . when I try to bring in a new
consumer group to consume from a certain topic . I have to restart the
producer only then it ( new consumer group) starts consuming. Is there any
other away without disturbing the producer.


Regards
V G Sunjay Jeffrish


Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Ali Akhtar
Can windows only be used for aggregations, or can they also be used for
foreach(), and such?

And is it possible to get metadata on the message, such as whether or not
its late, its index/position within the other messages, etc?

On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll  wrote:

> And since you asked for a pointer, Ali:
> http://docs.confluent.io/current/streams/concepts.html#windowing
>
>
> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll 
> wrote:
>
> > Late-arriving and out-of-order data is only treated specially for
> windowed
> > aggregations.
> >
> > For stateless operations such as `KStream#foreach()` or `KStream#map()`,
> > records are processed in the order they arrive (per partition).
> >
> > -Michael
> >
> >
> >
> >
> > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar 
> wrote:
> >
> >> > later when message A arrives it will put that message back into
> >> > the right temporal context and publish an amended result for the
> proper
> >> > time/session window as if message B were consumed in the timestamp
> order
> >> > before message A.
> >>
> >> Does this apply to the aggregation Kafka stream methods then, and not to
> >> e.g foreach?
> >>
> >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen 
> >> wrote:
> >>
> >> > Yes stream processing and CEP are subtlety different things.
> >> >
> >> > Kafka Streams helps you write stateful apps and allows that state to
> be
> >> > preserved on disk (a local State store) as well as distributed for HA
> or
> >> > for parallel partitioned processing (via Kafka topic partitions and
> >> > consumer groups) as well as in memory (as a performance enhancement).
> >> >
> >> > However a classical CEP engine with a pre-modeled state machine and
> >> > pattern matching rules is something different from stream processing.
> >> >
> >> > It is on course possible to build a CEP system on top on Kafka Streams
> >> and
> >> > get the best of both worlds.
> >> >
> >> > -hans
> >> >
> >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> >> > sabarish@gmail.com> wrote:
> >> > >
> >> > > Hans
> >> > >
> >> > > What you state would work for aggregations, but not for state
> machines
> >> > and
> >> > > CEP.
> >> > >
> >> > > Regards
> >> > > Sab
> >> > >
> >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" 
> >> wrote:
> >> > >>
> >> > >> The only way to make sure A is consumed first would be to delay the
> >> > >> consumption of message B for at least 15 minutes which would fly in
> >> the
> >> > >> face of the principals of a true streaming platform so the short
> >> answer
> >> > to
> >> > >> your question is "no" because that would be batch processing not
> >> stream
> >> > >> processing.
> >> > >>
> >> > >> However, Kafka Streams does handle late arriving data. So if you
> had
> >> > some
> >> > >> analytics that computes results on a time window or a session
> window
> >> > then
> >> > >> Kafka streams will compute on the stream in real time (processing
> >> > message
> >> > >> B) and then later when message A arrives it will put that message
> >> back
> >> > into
> >> > >> the right temporal context and publish an amended result for the
> >> proper
> >> > >> time/session window as if message B were consumed in the timestamp
> >> order
> >> > >> before message A. The end result of this flow is that you
> eventually
> >> get
> >> > >> the same results you would get in a batch processing system but
> with
> >> the
> >> > >> added benefit of getting intermediary result at much lower latency.
> >> > >>
> >> > >> -hans
> >> > >>
> >> > >> /**
> >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >> > >> * h...@confluent.io (650)924-2670
> >> > >> */
> >> > >>
> >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> ali.rac...@gmail.com>
> >> > wrote:
> >> > >>>
> >> > >>> Is it possible to have Kafka Streams order messages correctly by
> >> their
> >> > >>> timestamps, even if they arrived out of order?
> >> > >>>
> >> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with
> a
> >> > >>> timestamp of 5:15 PM, are sent.
> >> > >>>
> >> > >>> Message B arrives sooner than Message A, due to network issues.
> >> > >>>
> >> > >>> Is it possible to make sure that, across all consumers of Kafka
> >> Streams
> >> > >>> (even if they are across different servers, but have the same
> >> consumer
> >> > >>> group), Message A is consumed first, before Message B?
> >> > >>>
> >> > >>> Thanks.
> >> > >>>
> >> > >>
> >> >
> >>
> >
> >
> >
>


Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Michael Noll
Late-arriving and out-of-order data is only treated specially for windowed
aggregations.

For stateless operations such as `KStream#foreach()` or `KStream#map()`,
records are processed in the order they arrive (per partition).

-Michael




On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar  wrote:

> > later when message A arrives it will put that message back into
> > the right temporal context and publish an amended result for the proper
> > time/session window as if message B were consumed in the timestamp order
> > before message A.
>
> Does this apply to the aggregation Kafka stream methods then, and not to
> e.g foreach?
>
> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen  wrote:
>
> > Yes stream processing and CEP are subtlety different things.
> >
> > Kafka Streams helps you write stateful apps and allows that state to be
> > preserved on disk (a local State store) as well as distributed for HA or
> > for parallel partitioned processing (via Kafka topic partitions and
> > consumer groups) as well as in memory (as a performance enhancement).
> >
> > However a classical CEP engine with a pre-modeled state machine and
> > pattern matching rules is something different from stream processing.
> >
> > It is on course possible to build a CEP system on top on Kafka Streams
> and
> > get the best of both worlds.
> >
> > -hans
> >
> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > sabarish@gmail.com> wrote:
> > >
> > > Hans
> > >
> > > What you state would work for aggregations, but not for state machines
> > and
> > > CEP.
> > >
> > > Regards
> > > Sab
> > >
> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" 
> wrote:
> > >>
> > >> The only way to make sure A is consumed first would be to delay the
> > >> consumption of message B for at least 15 minutes which would fly in
> the
> > >> face of the principals of a true streaming platform so the short
> answer
> > to
> > >> your question is "no" because that would be batch processing not
> stream
> > >> processing.
> > >>
> > >> However, Kafka Streams does handle late arriving data. So if you had
> > some
> > >> analytics that computes results on a time window or a session window
> > then
> > >> Kafka streams will compute on the stream in real time (processing
> > message
> > >> B) and then later when message A arrives it will put that message back
> > into
> > >> the right temporal context and publish an amended result for the
> proper
> > >> time/session window as if message B were consumed in the timestamp
> order
> > >> before message A. The end result of this flow is that you eventually
> get
> > >> the same results you would get in a batch processing system but with
> the
> > >> added benefit of getting intermediary result at much lower latency.
> > >>
> > >> -hans
> > >>
> > >> /**
> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > >> * h...@confluent.io (650)924-2670
> > >> */
> > >>
> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar 
> > wrote:
> > >>>
> > >>> Is it possible to have Kafka Streams order messages correctly by
> their
> > >>> timestamps, even if they arrived out of order?
> > >>>
> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> > >>> timestamp of 5:15 PM, are sent.
> > >>>
> > >>> Message B arrives sooner than Message A, due to network issues.
> > >>>
> > >>> Is it possible to make sure that, across all consumers of Kafka
> Streams
> > >>> (even if they are across different servers, but have the same
> consumer
> > >>> group), Message A is consumed first, before Message B?
> > >>>
> > >>> Thanks.
> > >>>
> > >>
> >
>


Re: Out of order message processing with Kafka Streams

2017-03-20 Thread Michael Noll
And since you asked for a pointer, Ali:
http://docs.confluent.io/current/streams/concepts.html#windowing


On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll  wrote:

> Late-arriving and out-of-order data is only treated specially for windowed
> aggregations.
>
> For stateless operations such as `KStream#foreach()` or `KStream#map()`,
> records are processed in the order they arrive (per partition).
>
> -Michael
>
>
>
>
> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar  wrote:
>
>> > later when message A arrives it will put that message back into
>> > the right temporal context and publish an amended result for the proper
>> > time/session window as if message B were consumed in the timestamp order
>> > before message A.
>>
>> Does this apply to the aggregation Kafka stream methods then, and not to
>> e.g foreach?
>>
>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen 
>> wrote:
>>
>> > Yes stream processing and CEP are subtlety different things.
>> >
>> > Kafka Streams helps you write stateful apps and allows that state to be
>> > preserved on disk (a local State store) as well as distributed for HA or
>> > for parallel partitioned processing (via Kafka topic partitions and
>> > consumer groups) as well as in memory (as a performance enhancement).
>> >
>> > However a classical CEP engine with a pre-modeled state machine and
>> > pattern matching rules is something different from stream processing.
>> >
>> > It is on course possible to build a CEP system on top on Kafka Streams
>> and
>> > get the best of both worlds.
>> >
>> > -hans
>> >
>> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
>> > sabarish@gmail.com> wrote:
>> > >
>> > > Hans
>> > >
>> > > What you state would work for aggregations, but not for state machines
>> > and
>> > > CEP.
>> > >
>> > > Regards
>> > > Sab
>> > >
>> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" 
>> wrote:
>> > >>
>> > >> The only way to make sure A is consumed first would be to delay the
>> > >> consumption of message B for at least 15 minutes which would fly in
>> the
>> > >> face of the principals of a true streaming platform so the short
>> answer
>> > to
>> > >> your question is "no" because that would be batch processing not
>> stream
>> > >> processing.
>> > >>
>> > >> However, Kafka Streams does handle late arriving data. So if you had
>> > some
>> > >> analytics that computes results on a time window or a session window
>> > then
>> > >> Kafka streams will compute on the stream in real time (processing
>> > message
>> > >> B) and then later when message A arrives it will put that message
>> back
>> > into
>> > >> the right temporal context and publish an amended result for the
>> proper
>> > >> time/session window as if message B were consumed in the timestamp
>> order
>> > >> before message A. The end result of this flow is that you eventually
>> get
>> > >> the same results you would get in a batch processing system but with
>> the
>> > >> added benefit of getting intermediary result at much lower latency.
>> > >>
>> > >> -hans
>> > >>
>> > >> /**
>> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> > >> * h...@confluent.io (650)924-2670
>> > >> */
>> > >>
>> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar 
>> > wrote:
>> > >>>
>> > >>> Is it possible to have Kafka Streams order messages correctly by
>> their
>> > >>> timestamps, even if they arrived out of order?
>> > >>>
>> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
>> > >>> timestamp of 5:15 PM, are sent.
>> > >>>
>> > >>> Message B arrives sooner than Message A, due to network issues.
>> > >>>
>> > >>> Is it possible to make sure that, across all consumers of Kafka
>> Streams
>> > >>> (even if they are across different servers, but have the same
>> consumer
>> > >>> group), Message A is consumed first, before Message B?
>> > >>>
>> > >>> Thanks.
>> > >>>
>> > >>
>> >
>>
>
>
>


Re: Kafka Streams: lockException

2017-03-20 Thread Tianji Li
Hi Guys,

Great information again as usual, very helpful!

Very appreciated, thanks so much!

Tianji

PS: The Kafka Community is simply great!



On Fri, Mar 17, 2017 at 3:00 PM, Guozhang Wang  wrote:

> Tianji and Sachin (and also cc'ing people who I remember have reported
> similar RocksDB memory issues),
>
> Sharing my experience with RocksDB tuning and also chatting with the
> RocksDB community:
>
> 1. If you are frequently flushing the state stores (e.g. with high commit
> frequency) then you will end up with huge number of very small memtable
> files, and hence result in very high compaction pressure on RocksDB; if you
> use default number of compaction threads (1) it will not be able to catch
> up with the write throughput and compaction rate, and hence the gradual
> degradation of performance. We have changed the default
> num.compaction.threads in trunk but if you are under released version
> 0.10.2 or older, check your store's flush rate metrics and consider
> increasing the compaction threads.
>
> 2. The most common memory leaks from RocksDB JNI are iterator leaks. Make
> sure to close the iterator return for your range queries / fetches from the
> stores when you are done. If not the corresponding scanned memory will be
> pinned in memory and cannot be compacted.
>
>
> Guozhang
>
>
> On Fri, Mar 17, 2017 at 8:56 AM, Eno Thereska 
> wrote:
>
> > Sachin, you also have a PR for this that could help, right?:
> > https://github.com/apache/kafka/pull/2642#issuecomment-287372367 <
> > https://github.com/apache/kafka/pull/2642#issuecomment-287372367>.
> >
> > Thanks
> > Eno
> >
> >
> > > On 17 Mar 2017, at 15:19, Sachin Mittal  wrote:
> > >
> > > We also face same issues.
> > > What we have found is that rocksdb is the issue. With many instances of
> > > rocksdb per machine, over the time it slows down due to i/o operations,
> > > resulting in threads getting evicted because max.poll.interval exceeds
> > the
> > > set limit.
> > >
> > > Try running rocksdb in memory https://github.com/facebook/
> > > rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li  wrote:
> > >
> > >> Hi Eno,
> > >>
> > >> I used 150, 50, 20 threads and the probabilities of crashing decreased
> > with
> > >> this number. When using 1 thread, no crash!
> > >>
> > >> My max.poll.interval is 5 minutes and all the processing won't last
> that
> > >> long, so that parameter does not help.
> > >>
> > >>
> > >> Thanks
> > >> Tianji
> > >>
> > >> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska  >
> > >> wrote:
> > >>
> > >>> Hi Tianji,
> > >>>
> > >>> How many threads does your app use?
> > >>>
> > >>> One reason is explained here: https://groups.google.com/
> > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g <
> > https://groups.google.com/
> > >>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
> > >> increase
> > >>> max.poll.interval config value.
> > >>> If that doesn't work, could you revert to using one thread for now.
> > Also
> > >>> let us know either way since we might need to open a bug report.
> > >>>
> > >>> Thanks
> > >>> Eno
> > >>>
> >  On 16 Mar 2017, at 20:47, Tianji Li  wrote:
> > 
> >  Hi there,
> > 
> >  I always got this crashes and wonder if anyone knows why. Please let
> > me
> >  know what information I should provide to help with trouble
> shooting.
> > 
> >  I am using 0.10.2.0. My application is reading one topic and then
> >  groupBy().aggregate() 50 times on different keys.
> > 
> >  I use memory store, without backing to kafka.
> > 
> >  Thanks
> >  Tianji
> > 
> > 
> >  2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> >  o.a.k.s.p.internals.StreamThread : Could not create task
> 0_4.
> > >>> Will
> >  retry.
> > 
> >  org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
> > >> lock
> >  the state directory: /tmp/kafka-streams/xxx-test28/0_4
> >    at
> >  org.apache.kafka.streams.processor.internals.
> > >>> ProcessorStateManager.(ProcessorStateManager.java:102)
> >    at
> >  org.apache.kafka.streams.processor.internals.AbstractTask.(
> > >>> AbstractTask.java:73)
> >    at
> >  org.apache.kafka.streams.processor.internals.
> > >>> StreamTask.(StreamTask.java:108)
> >    at
> >  org.apache.kafka.streams.processor.internals.
> > >>> StreamThread.createStreamTask(StreamThread.java:834)
> >    at
> >  org.apache.kafka.streams.processor.internals.StreamThread$Ta
> > skCreator.
> > >>> createTask(StreamThread.java:1207)
> >    at
> >  org.apache.kafka.streams.processor.internals.StreamThread$
> > >>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> >   

clearing an aggregation?

2017-03-20 Thread Jon Yeargers
Is this possible? Im wondering about gathering data from a stream into a
series of windowed aggregators: minute, hour and day. A separate process
would start at fixed intervals, query the appropriate state store for
available values and then hopefully clear / zero / reset everything for the
next interval.

I could use the retention period setting but I would (somehow) need to
guarantee that the windows would reset on clock boundaries and not based on
start time for the app.


Re: Kafka Streams: lockException

2017-03-20 Thread Damian Guy
Mahendra,
The WAL is turned off in KafkaStreams. This file is just the rocksdb log,
you can probably just delete the old ones:
https://github.com/facebook/rocksdb/issues/849
In 0.10.0.1 there is no way of configuring RocksDB via KafkaStreams.

Thanks,
Damian

On Mon, 20 Mar 2017 at 09:22 Mahendra Kariya 
wrote:

> We did some more analysis on why the disk utilisation is continuously
> increasing. Turns out it's the RocksDB WAL that's utilising most of the
> disk space. The LOG.old WAL files are not getting deleted. Ideally they
> should have been. RocksDB provides certain configuration for purging WAL
> files
> <
> https://github.com/facebook/rocksdb/wiki/basic-operations#purging-wal-files
> >.
> But I am not sure how to set these configs. Any help would be really
> appreciated. Just for reference, our Kafka brokers are on v0.10.0.1 and
> RocksDB version is 4.8.0.
>
>
>
>
> On Mon, Mar 20, 2017 at 12:29 PM, Mahendra Kariya <
> mahendra.kar...@go-jek.com> wrote:
>
> > Hey Guozhang,
> >
> > Thanks a lot for these insights. We are facing the exact same problem as
> > Tianji. Our commit frequency is also quite high. We flush almost around
> 16K
> > messages per minute to Kafka at the end of the topology.
> >
> > Another issue that we are facing is that rocksdb is not deleting old
> data.
> > We have set the time window retention duration to 1 hour, but the disk
> size
> > is constantly increasing. Ideally, the disk utilisation graph should
> > plateau after some time.
> >
> > We would like to apply the config change suggestions that you have given.
> > But we are on Kafka 0.10.0.1. And from the docs, it seems
> > rocksdb.config.setter is not available for this version. Is there any
> > other way for us to configure rocksdb?
> >
> > Does Kafka 0.10.0.1 emit any rocksdb related metrics that we can monitor
> > and set up alerts on?
> >
> >
> > Thanks!
> >
> >
> >
>


Re: Kafka Streams: lockException

2017-03-20 Thread Mahendra Kariya
We did some more analysis on why the disk utilisation is continuously
increasing. Turns out it's the RocksDB WAL that's utilising most of the
disk space. The LOG.old WAL files are not getting deleted. Ideally they
should have been. RocksDB provides certain configuration for purging WAL
files
.
But I am not sure how to set these configs. Any help would be really
appreciated. Just for reference, our Kafka brokers are on v0.10.0.1 and
RocksDB version is 4.8.0.




On Mon, Mar 20, 2017 at 12:29 PM, Mahendra Kariya <
mahendra.kar...@go-jek.com> wrote:

> Hey Guozhang,
>
> Thanks a lot for these insights. We are facing the exact same problem as
> Tianji. Our commit frequency is also quite high. We flush almost around 16K
> messages per minute to Kafka at the end of the topology.
>
> Another issue that we are facing is that rocksdb is not deleting old data.
> We have set the time window retention duration to 1 hour, but the disk size
> is constantly increasing. Ideally, the disk utilisation graph should
> plateau after some time.
>
> We would like to apply the config change suggestions that you have given.
> But we are on Kafka 0.10.0.1. And from the docs, it seems
> rocksdb.config.setter is not available for this version. Is there any
> other way for us to configure rocksdb?
>
> Does Kafka 0.10.0.1 emit any rocksdb related metrics that we can monitor
> and set up alerts on?
>
>
> Thanks!
>
>
>


Re: Kafka Streams: lockException

2017-03-20 Thread Mahendra Kariya
Hey Guozhang,

Thanks a lot for these insights. We are facing the exact same problem as
Tianji. Our commit frequency is also quite high. We flush almost around 16K
messages per minute to Kafka at the end of the topology.

Another issue that we are facing is that rocksdb is not deleting old data.
We have set the time window retention duration to 1 hour, but the disk size
is constantly increasing. Ideally, the disk utilisation graph should
plateau after some time.

We would like to apply the config change suggestions that you have given.
But we are on Kafka 0.10.0.1. And from the docs, it seems
rocksdb.config.setter is not available for this version. Is there any other
way for us to configure rocksdb?

Does Kafka 0.10.0.1 emit any rocksdb related metrics that we can monitor
and set up alerts on?


Thanks!