Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Hans Jespersen
While it's not exactly the same as the window start/stop time you can store (in 
the state store) the earliest and latest timestamps of any messages in each 
window and use that as a good approximation for the window boundary times.  

-hans

> On Mar 20, 2017, at 1:00 PM, Ali Akhtar  wrote:
> 
> Yeah, windowing seems perfect, if only I could find out the current
> window's start time (so I can log the current bucket's start & end times)
> and process window messages individually rather than as aggregates.
> 
> It doesn't seem like i can get this metadata from ProcessorContext though,
> from looking over the javadocs
> 
>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll  wrote:
>> 
>> Ali,
>> 
>> what you describe is (roughly!) how Kafka Streams implements the internal
>> state stores to support windowing.
>> 
>> Some users have been following a similar approach as you outlined, using
>> the Processor API.
>> 
>> 
>> 
>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar  wrote:
>>> 
>>> It would be helpful to know the 'start' and 'end' of the current
>> metadata,
>>> so if an out of order message arrives late, and is being processed in
>>> foreach(), you'd know which window / bucket it belongs to, and can handle
>>> it accordingly.
>>> 
>>> I'm guessing that's not possible at the moment.
>>> 
>>> (My use case is, i receive a stream of messages. Messages need to be
>> stored
>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
>> gap
>>> of 30 mins or more since the last message (under a key), a new 'session'
>>> (bucket) should be started, and future messages should belong to that
>>> 'session', until the next 30+ min gap).
>>> 
>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll 
>>> wrote:
>>> 
> Can windows only be used for aggregations, or can they also be used
>> for
 foreach(),
 and such?
 
 As of today, you can use windows only in aggregations.
 
> And is it possible to get metadata on the message, such as whether or
 not its
 late, its index/position within the other messages, etc?
 
 If you use the Processor API of Kafka Streams, you can have access to
>> an
 incoming record's topic, partition, offset, etc. via the so-called
 ProcessorContext (which is updated for every new incoming record):
 
 http://docs.confluent.io/current/streams/javadocs/org/
 apache/kafka/streams/processor/Processor.html
 - You can get/store a reference to the ProcessorContext from
 `Processor#init()`.
 
 http://docs.confluent.io/current/streams/javadocs/org/
 apache/kafka/streams/processor/ProcessorContext.html
 - The context can then be used within `Processor#process()` when you
 process a new record.  As I said, the context is updated behind the
>>> scenes
 to match the record that is currently being processed.
 
 
 Best,
 Michael
 
 
 
 
 On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar 
>>> wrote:
 
> Can windows only be used for aggregations, or can they also be used
>> for
> foreach(), and such?
> 
> And is it possible to get metadata on the message, such as whether or
>>> not
> its late, its index/position within the other messages, etc?
> 
> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
> wrote:
> 
>> And since you asked for a pointer, Ali:
>> http://docs.confluent.io/current/streams/concepts.html#windowing
>> 
>> 
>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
>> mich...@confluent.io>
>> wrote:
>> 
>>> Late-arriving and out-of-order data is only treated specially for
>> windowed
>>> aggregations.
>>> 
>>> For stateless operations such as `KStream#foreach()` or
> `KStream#map()`,
>>> records are processed in the order they arrive (per partition).
>>> 
>>> -Michael
>>> 
>>> 
>>> 
>>> 
>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
>> ali.rac...@gmail.com
 
>> wrote:
>>> 
> later when message A arrives it will put that message back
>> into
> the right temporal context and publish an amended result for
>> the
>> proper
> time/session window as if message B were consumed in the
>>> timestamp
>> order
> before message A.
 
 Does this apply to the aggregation Kafka stream methods then,
>> and
 not
> to
 e.g foreach?
 
 On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
>>> h...@confluent.io>
 wrote:
 
> Yes stream processing and CEP are subtlety different things.
> 
> Kafka Streams helps you write stateful apps and allows that
>>> state
 to
>> be
> preserved on disk (a local State store) as well as distributed
>>> for
> HA
>> or
> for parallel partitioned processing (via Kafka topic
>> partitions
 and
> consumer groups) as well as in memory (as a performance
> enhance

Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Ali Akhtar
That would require

- Knowing the current window's id (or some other identifier) to
differentiate it from other windows

- Being able to process individual messages in a window

Are those 2 things possible w/ kafka streams? (java)

On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen  wrote:

> While it's not exactly the same as the window start/stop time you can
> store (in the state store) the earliest and latest timestamps of any
> messages in each window and use that as a good approximation for the window
> boundary times.
>
> -hans
>
> > On Mar 20, 2017, at 1:00 PM, Ali Akhtar  wrote:
> >
> > Yeah, windowing seems perfect, if only I could find out the current
> > window's start time (so I can log the current bucket's start & end times)
> > and process window messages individually rather than as aggregates.
> >
> > It doesn't seem like i can get this metadata from ProcessorContext
> though,
> > from looking over the javadocs
> >
> >> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll 
> wrote:
> >>
> >> Ali,
> >>
> >> what you describe is (roughly!) how Kafka Streams implements the
> internal
> >> state stores to support windowing.
> >>
> >> Some users have been following a similar approach as you outlined, using
> >> the Processor API.
> >>
> >>
> >>
> >>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar 
> wrote:
> >>>
> >>> It would be helpful to know the 'start' and 'end' of the current
> >> metadata,
> >>> so if an out of order message arrives late, and is being processed in
> >>> foreach(), you'd know which window / bucket it belongs to, and can
> handle
> >>> it accordingly.
> >>>
> >>> I'm guessing that's not possible at the moment.
> >>>
> >>> (My use case is, i receive a stream of messages. Messages need to be
> >> stored
> >>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
> >> gap
> >>> of 30 mins or more since the last message (under a key), a new
> 'session'
> >>> (bucket) should be started, and future messages should belong to that
> >>> 'session', until the next 30+ min gap).
> >>>
> >>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll 
> >>> wrote:
> >>>
> > Can windows only be used for aggregations, or can they also be used
> >> for
>  foreach(),
>  and such?
> 
>  As of today, you can use windows only in aggregations.
> 
> > And is it possible to get metadata on the message, such as whether or
>  not its
>  late, its index/position within the other messages, etc?
> 
>  If you use the Processor API of Kafka Streams, you can have access to
> >> an
>  incoming record's topic, partition, offset, etc. via the so-called
>  ProcessorContext (which is updated for every new incoming record):
> 
>  http://docs.confluent.io/current/streams/javadocs/org/
>  apache/kafka/streams/processor/Processor.html
>  - You can get/store a reference to the ProcessorContext from
>  `Processor#init()`.
> 
>  http://docs.confluent.io/current/streams/javadocs/org/
>  apache/kafka/streams/processor/ProcessorContext.html
>  - The context can then be used within `Processor#process()` when you
>  process a new record.  As I said, the context is updated behind the
> >>> scenes
>  to match the record that is currently being processed.
> 
> 
>  Best,
>  Michael
> 
> 
> 
> 
>  On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar 
> >>> wrote:
> 
> > Can windows only be used for aggregations, or can they also be used
> >> for
> > foreach(), and such?
> >
> > And is it possible to get metadata on the message, such as whether or
> >>> not
> > its late, its index/position within the other messages, etc?
> >
> > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
> > wrote:
> >
> >> And since you asked for a pointer, Ali:
> >> http://docs.confluent.io/current/streams/concepts.html#windowing
> >>
> >>
> >> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> >> mich...@confluent.io>
> >> wrote:
> >>
> >>> Late-arriving and out-of-order data is only treated specially for
> >> windowed
> >>> aggregations.
> >>>
> >>> For stateless operations such as `KStream#foreach()` or
> > `KStream#map()`,
> >>> records are processed in the order they arrive (per partition).
> >>>
> >>> -Michael
> >>>
> >>>
> >>>
> >>>
> >>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> >> ali.rac...@gmail.com
> 
> >> wrote:
> >>>
> > later when message A arrives it will put that message back
> >> into
> > the right temporal context and publish an amended result for
> >> the
> >> proper
> > time/session window as if message B were consumed in the
> >>> timestamp
> >> order
> > before message A.
> 
>  Does this apply to the aggregation Kafka stream methods then,
> >> and
>  not
> > to
>  e.g foreach?
> 
>  On Sun, M

Kafka consumer offset reset

2017-03-21 Thread Jakub Stransky
Hello,

just recently migrated to using Kafka 0.10.1.0

I would like to reset position for some consumers. I went through
documentation and couldn't spot it how to achieve that. All what I got is
that v 10 reduces usage of zookeeper and clients have possibility to use
different storage for maintaining the offsets.

Could someone more experienced elaborate a bit on this topic?

Thanks
jakub


Re: Kafka consumer offset reset

2017-03-21 Thread Robert Quinlivan
What offset do you want to reset them to? The easier way to adjust offsets
in 0.10 is to attach a consumer for the target topic-partition and seek to
the position you desire and commit that new offset.

On Tue, Mar 21, 2017 at 9:56 AM, Jakub Stransky 
wrote:

> Hello,
>
> just recently migrated to using Kafka 0.10.1.0
>
> I would like to reset position for some consumers. I went through
> documentation and couldn't spot it how to achieve that. All what I got is
> that v 10 reduces usage of zookeeper and clients have possibility to use
> different storage for maintaining the offsets.
>
> Could someone more experienced elaborate a bit on this topic?
>
> Thanks
> jakub
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Damian Guy
Hi Ali,

(My use case is, i receive a stream of messages. Messages need to be stored
> and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap
> of 30 mins or more since the last message (under a key), a new 'session'
> (bucket) should be started, and future messages should belong to that
> 'session', until the next 30+ min gap).
>
>
This sounds like you might want to use the SessionWindows feature in
KafkaStreams.
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/SessionWindows.html


Re: Streams RocksDBException with no message?

2017-03-21 Thread Mathieu Fenniak
Thanks Guozhang.

For my part, turns out I was hitting ulimit on my open file descriptors.
Phew, easy to fix... once I figured it out. :-)

Mathieu


On Fri, Mar 17, 2017 at 4:14 PM, Guozhang Wang  wrote:

> Hi Mathieu,
>
> We are aware of that since long time ago and I have been looking into this
> issue, turns out to be a known issue in RocksDB:
>
> https://github.com/facebook/rocksdb/issues/1688
>
> And the corresponding fix (https://github.com/facebook/rocksdb/pull/1714)
> has been merged in master but marked for
>
>- v5.1.4 
>
> only while the latest release is 5.1.2.
>
>
> Guozhang
>
>
> On Fri, Mar 17, 2017 at 10:27 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hey all,
> >
> > So... what does it mean to have a RocksDBException with a message that
> just
> > has a single character?  "e", "q", "]"... I've seen a few.  Has anyone
> seen
> > this before?
> >
> > Two example exceptions:
> > https://gist.github.com/mfenniak/c56beb6d5058e2b21df0309aea224f12
> >
> > Kafka Streams 0.10.2.0.  Both of these errors occurred during state store
> > initialization.  I'm running a single Kafka Streams thread per server,
> this
> > occurred on two servers about a half-hour apart.
> >
> > Mathieu
> >
>
>
>
> --
> -- Guozhang
>


Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Hans Jespersen
Yes, and yes!

-hans



> On Mar 21, 2017, at 7:45 AM, Ali Akhtar  wrote:
> 
> That would require
> 
> - Knowing the current window's id (or some other identifier) to
> differentiate it from other windows
> 
> - Being able to process individual messages in a window
> 
> Are those 2 things possible w/ kafka streams? (java)
> 
> On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen  wrote:
> 
>> While it's not exactly the same as the window start/stop time you can
>> store (in the state store) the earliest and latest timestamps of any
>> messages in each window and use that as a good approximation for the window
>> boundary times.
>> 
>> -hans
>> 
>>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar  wrote:
>>> 
>>> Yeah, windowing seems perfect, if only I could find out the current
>>> window's start time (so I can log the current bucket's start & end times)
>>> and process window messages individually rather than as aggregates.
>>> 
>>> It doesn't seem like i can get this metadata from ProcessorContext
>> though,
>>> from looking over the javadocs
>>> 
 On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll 
>> wrote:
 
 Ali,
 
 what you describe is (roughly!) how Kafka Streams implements the
>> internal
 state stores to support windowing.
 
 Some users have been following a similar approach as you outlined, using
 the Processor API.
 
 
 
> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar 
>> wrote:
> 
> It would be helpful to know the 'start' and 'end' of the current
 metadata,
> so if an out of order message arrives late, and is being processed in
> foreach(), you'd know which window / bucket it belongs to, and can
>> handle
> it accordingly.
> 
> I'm guessing that's not possible at the moment.
> 
> (My use case is, i receive a stream of messages. Messages need to be
 stored
> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
 gap
> of 30 mins or more since the last message (under a key), a new
>> 'session'
> (bucket) should be started, and future messages should belong to that
> 'session', until the next 30+ min gap).
> 
> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll 
> wrote:
> 
>>> Can windows only be used for aggregations, or can they also be used
 for
>> foreach(),
>> and such?
>> 
>> As of today, you can use windows only in aggregations.
>> 
>>> And is it possible to get metadata on the message, such as whether or
>> not its
>> late, its index/position within the other messages, etc?
>> 
>> If you use the Processor API of Kafka Streams, you can have access to
 an
>> incoming record's topic, partition, offset, etc. via the so-called
>> ProcessorContext (which is updated for every new incoming record):
>> 
>> http://docs.confluent.io/current/streams/javadocs/org/
>> apache/kafka/streams/processor/Processor.html
>> - You can get/store a reference to the ProcessorContext from
>> `Processor#init()`.
>> 
>> http://docs.confluent.io/current/streams/javadocs/org/
>> apache/kafka/streams/processor/ProcessorContext.html
>> - The context can then be used within `Processor#process()` when you
>> process a new record.  As I said, the context is updated behind the
> scenes
>> to match the record that is currently being processed.
>> 
>> 
>> Best,
>> Michael
>> 
>> 
>> 
>> 
>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar 
> wrote:
>> 
>>> Can windows only be used for aggregations, or can they also be used
 for
>>> foreach(), and such?
>>> 
>>> And is it possible to get metadata on the message, such as whether or
> not
>>> its late, its index/position within the other messages, etc?
>>> 
>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll 
>>> wrote:
>>> 
 And since you asked for a pointer, Ali:
 http://docs.confluent.io/current/streams/concepts.html#windowing
 
 
 On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
 mich...@confluent.io>
 wrote:
 
> Late-arriving and out-of-order data is only treated specially for
 windowed
> aggregations.
> 
> For stateless operations such as `KStream#foreach()` or
>>> `KStream#map()`,
> records are processed in the order they arrive (per partition).
> 
> -Michael
> 
> 
> 
> 
> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
 ali.rac...@gmail.com
>> 
 wrote:
> 
>>> later when message A arrives it will put that message back
 into
>>> the right temporal context and publish an amended result for
 the
 proper
>>> time/session window as if message B were consumed in the
> timestamp
 order
>>> before message A.
>> 

Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Ali Akhtar
Hans,

Which class's javadocs should i look at? From my initial look at the
javadocs and discussion with Michael, it doesn't seem possible.

On Tue, Mar 21, 2017 at 10:44 PM, Hans Jespersen  wrote:

> Yes, and yes!
>
> -hans
>
>
>
> > On Mar 21, 2017, at 7:45 AM, Ali Akhtar  wrote:
> >
> > That would require
> >
> > - Knowing the current window's id (or some other identifier) to
> > differentiate it from other windows
> >
> > - Being able to process individual messages in a window
> >
> > Are those 2 things possible w/ kafka streams? (java)
> >
> > On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen 
> wrote:
> >
> >> While it's not exactly the same as the window start/stop time you can
> >> store (in the state store) the earliest and latest timestamps of any
> >> messages in each window and use that as a good approximation for the
> window
> >> boundary times.
> >>
> >> -hans
> >>
> >>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar  wrote:
> >>>
> >>> Yeah, windowing seems perfect, if only I could find out the current
> >>> window's start time (so I can log the current bucket's start & end
> times)
> >>> and process window messages individually rather than as aggregates.
> >>>
> >>> It doesn't seem like i can get this metadata from ProcessorContext
> >> though,
> >>> from looking over the javadocs
> >>>
>  On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll 
> >> wrote:
> 
>  Ali,
> 
>  what you describe is (roughly!) how Kafka Streams implements the
> >> internal
>  state stores to support windowing.
> 
>  Some users have been following a similar approach as you outlined,
> using
>  the Processor API.
> 
> 
> 
> > On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar 
> >> wrote:
> >
> > It would be helpful to know the 'start' and 'end' of the current
>  metadata,
> > so if an out of order message arrives late, and is being processed in
> > foreach(), you'd know which window / bucket it belongs to, and can
> >> handle
> > it accordingly.
> >
> > I'm guessing that's not possible at the moment.
> >
> > (My use case is, i receive a stream of messages. Messages need to be
>  stored
> > and sorted into 'buckets', to indicate 'sessions'. Each time there's
> a
>  gap
> > of 30 mins or more since the last message (under a key), a new
> >> 'session'
> > (bucket) should be started, and future messages should belong to that
> > 'session', until the next 30+ min gap).
> >
> > On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll  >
> > wrote:
> >
> >>> Can windows only be used for aggregations, or can they also be used
>  for
> >> foreach(),
> >> and such?
> >>
> >> As of today, you can use windows only in aggregations.
> >>
> >>> And is it possible to get metadata on the message, such as whether
> or
> >> not its
> >> late, its index/position within the other messages, etc?
> >>
> >> If you use the Processor API of Kafka Streams, you can have access
> to
>  an
> >> incoming record's topic, partition, offset, etc. via the so-called
> >> ProcessorContext (which is updated for every new incoming record):
> >>
> >> http://docs.confluent.io/current/streams/javadocs/org/
> >> apache/kafka/streams/processor/Processor.html
> >> - You can get/store a reference to the ProcessorContext from
> >> `Processor#init()`.
> >>
> >> http://docs.confluent.io/current/streams/javadocs/org/
> >> apache/kafka/streams/processor/ProcessorContext.html
> >> - The context can then be used within `Processor#process()` when you
> >> process a new record.  As I said, the context is updated behind the
> > scenes
> >> to match the record that is currently being processed.
> >>
> >>
> >> Best,
> >> Michael
> >>
> >>
> >>
> >>
> >> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar 
> > wrote:
> >>
> >>> Can windows only be used for aggregations, or can they also be used
>  for
> >>> foreach(), and such?
> >>>
> >>> And is it possible to get metadata on the message, such as whether
> or
> > not
> >>> its late, its index/position within the other messages, etc?
> >>>
> >>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <
> mich...@confluent.io>
> >>> wrote:
> >>>
>  And since you asked for a pointer, Ali:
>  http://docs.confluent.io/current/streams/concepts.html#windowing
> 
> 
>  On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
>  mich...@confluent.io>
>  wrote:
> 
> > Late-arriving and out-of-order data is only treated specially for
>  windowed
> > aggregations.
> >
> > For stateless operations such as `KStream#foreach()` or
> >>> `KStream#map()`,
> > records are processed in the order they arrive (per partition).
> >
> > -Michael
> >

Commits of slow moving topics in Kafka Streams time out

2017-03-21 Thread Frank Lyaruu
Hi Kafka people,

We have a Kafka Streams application that replicates a database, and
transforms it to a different data model. Some tables/topics move fast, with
many changes a second, some might be dormant for months. For those slow
moving topics, we have some trouble with the 'offsets.retention.minutes'.
After that timeout it's offset is effectively invalid. As long as it keeps
running, everything is ok, but when we restart the application it will fall
back to 'earliest' and redo the entire topic, which is obviously wrong.

How can we fix this or work around this? Our first idea was to enable a
very slow auto commit, so the offset.retention.minutes never times out, but
Kafka Streams seems to explicitly forbid that. The StreamConfig javadoc
says: ""Furthermore, it is not allowed to enable "enable.auto.commit" that
is disabled by Kafka Streams by default."

So how do we keep those commits alive? Why doesn't Kafka Streams allow a
slow auto commit to prevent this? Any other ideas on how to circumvent this
problem?

regards, Frank

We're running the trunk version of last week.


Re: Commits of slow moving topics in Kafka Streams time out

2017-03-21 Thread Eno Thereska
Hi Frank,

There is a similar discussion here 
https://www.mail-archive.com/users@kafka.apache.org/msg25089.html 
, with a 
JIRA.

Have you tried to increase the retention minutes to something large?

Thanks
Eno
> On 21 Mar 2017, at 19:13, Frank Lyaruu  wrote:
> 
> Hi Kafka people,
> 
> We have a Kafka Streams application that replicates a database, and
> transforms it to a different data model. Some tables/topics move fast, with
> many changes a second, some might be dormant for months. For those slow
> moving topics, we have some trouble with the 'offsets.retention.minutes'.
> After that timeout it's offset is effectively invalid. As long as it keeps
> running, everything is ok, but when we restart the application it will fall
> back to 'earliest' and redo the entire topic, which is obviously wrong.
> 
> How can we fix this or work around this? Our first idea was to enable a
> very slow auto commit, so the offset.retention.minutes never times out, but
> Kafka Streams seems to explicitly forbid that. The StreamConfig javadoc
> says: ""Furthermore, it is not allowed to enable "enable.auto.commit" that
> is disabled by Kafka Streams by default."
> 
> So how do we keep those commits alive? Why doesn't Kafka Streams allow a
> slow auto commit to prevent this? Any other ideas on how to circumvent this
> problem?
> 
> regards, Frank
> 
> We're running the trunk version of last week.



Using Kafka Stream in the cluster of kafka on one or multiple docker-machine/s

2017-03-21 Thread Mina Aslani
Hi,

I am trying to understand how I can use a kafka stream app(jar file) in a
cluster of kafka containers.

Kafka does not have master/slave concept (unlike spark), how I should run
my app in the cluster of kafkas (e.g. on one or multiple docker-machine/s)?

I use below command line when having one VM/node with one kafka container
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62

Best regards,
Mina


Re: Using Kafka Stream in the cluster of kafka on one or multiple docker-machine/s

2017-03-21 Thread Michael Noll
Typically you'd containerize your app and then launch e.g. 10 containers if
you need to run 10 instances of your app.

Also, what do you mean by "in a cluster of Kafka containers" and "in the
cluster of Kafkas"?

On Tue, Mar 21, 2017 at 9:08 PM, Mina Aslani  wrote:

> Hi,
>
> I am trying to understand how I can use a kafka stream app(jar file) in a
> cluster of kafka containers.
>
> Kafka does not have master/slave concept (unlike spark), how I should run
> my app in the cluster of kafkas (e.g. on one or multiple docker-machine/s)?
>
> I use below command line when having one VM/node with one kafka container
> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/
> java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62
>
> Best regards,
> Mina
>


Re: Using Kafka Stream in the cluster of kafka on one or multiple docker-machine/s

2017-03-21 Thread Mina Aslani
Hi Michael,

Thank you very much for the prompt response, really appreciate it!

>From https://github.com/confluentinc/examples/blob/3.2.x/kafka-
streams/src/main/java/io/confluent/examples/streams/Wor
dCountLambdaExample.java#L55-L62 and
https://github.com/confluentinc/examples/tree/3.2.x/kafka-streams#packaging-and-running
I
missed the fact that the jar should be run in a separate container.

Best regards,
Mina

On Tue, Mar 21, 2017 at 4:34 PM, Michael Noll  wrote:

> Typically you'd containerize your app and then launch e.g. 10 containers if
> you need to run 10 instances of your app.
>
> Also, what do you mean by "in a cluster of Kafka containers" and "in the
> cluster of Kafkas"?
>
> On Tue, Mar 21, 2017 at 9:08 PM, Mina Aslani  wrote:
>
> > Hi,
> >
> > I am trying to understand how I can use a kafka stream app(jar file) in a
> > cluster of kafka containers.
> >
> > Kafka does not have master/slave concept (unlike spark), how I should run
> > my app in the cluster of kafkas (e.g. on one or multiple
> docker-machine/s)?
> >
> > I use below command line when having one VM/node with one kafka container
> > https://github.com/confluentinc/examples/blob/3.
> > 2.x/kafka-streams/src/main/
> > java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62
> >
> > Best regards,
> > Mina
> >
>


Are there Connector artifacts in Confluent or any other Maven repository?

2017-03-21 Thread Phillip Mann
I am trying to migrate from StreamX (https://github.com/qubole/streamx) to use 
the official Confluent S3 connector 
(https://github.com/confluentinc/kafka-connect-storage-cloud).  Part of my 
implementation of Kafka Connect requires a custom partitioner.  This 
partitioner originally extended the Partitioner defined here 
(https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/partitioner/Partitioner.java).
  This was possible because I would build StreamX and add it to my companie’s 
artifact repository.  However, before I fork a bunch of different Confluent 
projects and then add them to my companies repository, I would like to know if 
it would be possible to import different Confluent projects such as HDFS 
connector and S3 connector through Maven so that I can use code from these 
projects.  If this doesn’t exist, why doesn’t Confluent add these artifacts to 
the Confluent repository?  Thanks for your help!

Phillip


Re: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-21 Thread Guozhang Wang
Just to clarify, I did want to have the term `Topology` as part of the
class name, for the reasons above. I'm not too worried about to be
consistent with the previous names, but I feel the `XXTopologyBuilder` is
better than `XXStreamsBuilder` since it's build() function returns a
Topology object.


Guozhang


On Mon, Mar 20, 2017 at 12:53 PM, Michael Noll  wrote:

> Hmm, I must admit I don't like this last update all too much.
>
> Basically we would have:
>
> StreamsBuilder builder = new StreamsBuilder();
>
> // And here you'd define your...well, what actually?
> // Ah right, you are composing a topology here, though you are not
> aware of it.
>
> KafkaStreams streams = new KafkaStreams(builder.build(),
> streamsConfiguration);
>
> So what are you building here with StreamsBuilder?  Streams (hint: No)?
> And what about tables -- is there a TableBuilder (hint: No)?
>
> I also interpret Guozhang's last response as that he'd prefer to have
> "Topology" in the class/interface names.  I am aware that we shouldn't
> necessarily use the status quo to make decisions about future changes, but
> the very first concept we explain in the Kafka Streams documentation is
> "Stream Processing Topology":
> https://kafka.apache.org/0102/documentation/streams#streams_concepts
>
> -Michael
>
>
>
> On Mon, Mar 20, 2017 at 7:55 PM, Matthias J. Sax 
> wrote:
>
> > \cc users list
> >
> >
> >  Forwarded Message 
> > Subject: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API
> > Date: Mon, 20 Mar 2017 11:51:01 -0700
> > From: Matthias J. Sax 
> > Organization: Confluent Inc
> > To: d...@kafka.apache.org
> >
> > I want to push this discussion further.
> >
> > Guozhang's argument about "exposing" the Topology class is valid. It's a
> > public class anyway, so it's not as issue. However, I think the question
> > is not too much about exposing but about "advertising" (ie, putting it
> > into the focus) or not at DSL level.
> >
> >
> > If I interpret the last replies correctly, it seems that we could agree
> > on "StreamsBuilder" as name. I did update the KIP accordingly. Please
> > correct me, if I got this wrong.
> >
> >
> > If there are not other objects -- this naming discussion was the last
> > open point to far -- I would like the start the VOTE thread.
> >
> >
> > -Matthias
> >
> >
> > On 3/14/17 2:37 PM, Guozhang Wang wrote:
> > > I'd like to keep the term "Topology" inside the builder class since, as
> > > Matthias mentioned, this builder#build() function returns a "Topology"
> > > object, whose type is a public class anyways. Although you can argue to
> > let
> > > users always call
> > >
> > > "new KafkaStreams(builder.build())"
> > >
> > > I think it is still more benefit to expose this concept.
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Mar 14, 2017 at 10:43 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > >
> > >> Thanks for your input Michael.
> > >>
> >  - KafkaStreams as the new name for the builder that creates the
> > logical
> >  plan, with e.g. `KafkaStreams.stream("intput-topic")` and
> >  `KafkaStreams.table("input-topic")`.
> > >>
> > >> I don't thinks this is a good idea, for multiple reasons:
> > >>
> > >> (1) We would reuse a name for a completely different purpose. The same
> > >> argument for not renaming KStreamBuilder to TopologyBuilder. The
> > >> confusion would just be too large.
> > >>
> > >> So if we would start from scratch, it might be ok to do so, but now we
> > >> cannot make this move, IMHO.
> > >>
> > >> Also a clarification question: do you suggest to have static methods
> > >> #stream and #table -- I am not sure if this would work?
> > >> (or was you code snippet just simplification?)
> > >>
> > >>
> > >> (2) Kafka Streams is basically a "processing client" next to consumer
> > >> and producer client. Thus, the name KafkaStreams aligns to the naming
> > >> schema of KafkaConsumer and KafkaProducer. I am not sure if it would
> be
> > >> a good choice to "break" this naming scheme.
> > >>
> > >> Btw: this is also the reason, why we have KafkaStreams#close() -- and
> > >> not KafkaStreams#stop() -- because #close() aligns with consumer and
> > >> producer client.
> > >>
> > >>
> > >> (3) On more argument against using KafkaStreams as DSL entry class
> would
> > >> be, that it would need to create a Topology that can be given to the
> > >> "runner/processing-client". Thus the pattern would be
> > >>
> > >>> Topology topology = streams.build();
> > >>> KafkaStramsRunner runner = new KafkaStreamsRunner(..., topology)
> > >>
> > >> (or of course as a one liner).
> > >>
> > >>
> > >>
> > >> On the other hand, there was the idea (that we intentionally excluded
> > >> from the KIP), to change the "client instantiation" pattern.
> > >>
> > >> Right now, a new client in actively instantiated (ie, by calling
> "new")
> > >> and the topology if provided as a constructor argument. However,
> > >> especially for DSL (not sure if 

Question about kafka-streams task load balancing

2017-03-21 Thread Prasad, Karthik
Hey,

I have a typical scenario of a kafka-streams application in a production 
environment.

We have a kafka-cluster with multiple topics. Messages from one topic is being 
consumed by a the kafka-streams application. The topic, currently, has 9 
partitions. We have configured consumer thread count to 14. We are running 2 
instances of this stream application on 2 different machines, thereby 
consisting of 28 threads across both machines. The group id for the consumers 
are the same. But, what I observe is that all partitions are being assigned to 
threads on a single machine. Now, I do understand that if the task on the 
active machine fails, then the threads in the other machine would take over. My 
question is that is there a way that kafka-streams can auto-balance across 
instances of the same stream application ? If yes, how do I go about doing that 
? Please let me know. Thanks,

Best,
Karthik Prasad
Senior Software Engineer
Sony Interactive Entertainment




Re: Are there Connector artifacts in Confluent or any other Maven repository?

2017-03-21 Thread Ewen Cheslack-Postava
Yes, these get published to Confluent's maven repository. Follow the
instructions here
http://docs.confluent.io/current/installation.html#installation-maven for
adding the Confluent maven repository to your project and then add a
dependency for the connector to your project (e.g. for that partitioner you
need io.confluent.kafka-connect-hdfs). Be sure to add it as a provided
dependency so you don't actually get an extra copy of the connector and its
dependencies.

-Ewen

On Tue, Mar 21, 2017 at 1:57 PM, Phillip Mann  wrote:

> I am trying to migrate from StreamX (https://github.com/qubole/streamx)
> to use the official Confluent S3 connector (https://github.com/
> confluentinc/kafka-connect-storage-cloud).  Part of my implementation of
> Kafka Connect requires a custom partitioner.  This partitioner originally
> extended the Partitioner defined here (https://github.com/
> confluentinc/kafka-connect-hdfs/blob/master/src/main/
> java/io/confluent/connect/hdfs/partitioner/Partitioner.java).  This was
> possible because I would build StreamX and add it to my companie’s artifact
> repository.  However, before I fork a bunch of different Confluent projects
> and then add them to my companies repository, I would like to know if it
> would be possible to import different Confluent projects such as HDFS
> connector and S3 connector through Maven so that I can use code from these
> projects.  If this doesn’t exist, why doesn’t Confluent add these artifacts
> to the Confluent repository?  Thanks for your help!
>
> Phillip
>


Re: Question about kafka-streams task load balancing

2017-03-21 Thread Matthias J. Sax
Hi,

I guess, it's currently not possible to load balance between different
machines. It might be a nice optimization to add into Streams though.

Right now, you should reduce the number of threads. Load balancing is
based on threads, and thus, if Streams place tasks to all threads of one
machine, it will automatically assign the remaining tasks to thread of
the second machine.

Btw: If you have only 9 input partitions, you will get most likely 9
tasks (might be more, depending on your topology structure) and thus,
you cannot utilize more then 9 thread anyway. Thus, running with 28
thread will most likely result in many idle threads.

See the docs for more details:

 -
http://docs.confluent.io/current/streams/architecture.html#parallelism-model
 -
http://docs.confluent.io/current/streams/architecture.html#threading-model



-Matthias

On 3/21/17 3:40 PM, Prasad, Karthik wrote:
> Hey,
> 
> I have a typical scenario of a kafka-streams application in a production 
> environment.
> 
> We have a kafka-cluster with multiple topics. Messages from one topic is 
> being consumed by a the kafka-streams application. The topic, currently, has 
> 9 partitions. We have configured consumer thread count to 14. We are running 
> 2 instances of this stream application on 2 different machines, thereby 
> consisting of 28 threads across both machines. The group id for the consumers 
> are the same. But, what I observe is that all partitions are being assigned 
> to threads on a single machine. Now, I do understand that if the task on the 
> active machine fails, then the threads in the other machine would take over. 
> My question is that is there a way that kafka-streams can auto-balance across 
> instances of the same stream application ? If yes, how do I go about doing 
> that ? Please let me know. Thanks,
> 
> Best,
> Karthik Prasad
> Senior Software Engineer
> Sony Interactive Entertainment
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Consumers not rebalancing after replication

2017-03-21 Thread Mahendra Kariya
Hey All,

We have six consumers in a consumer group. At times, some of the partitions
are under replicated for a while (maybe, 2 mis). During this time, the
consumers subscribed to such partitions stops getting data from Kafka and
they become inactive for a while. But when the partitions are fully
replicated again, only the active consumers gets rebalanced. The inactive
ones continue to remain inactive.

What could be causing this? How to fix this issue?


Error while starting kafka server in Windows

2017-03-21 Thread vani reddy
Hi,  I am getting below error:I even restarted my system, but still the
same error.
Iam using the version kafka_2.11-0.10.1.0

java.io.IOException: The requested operation cannot be performed on a file
with a user-mapped section o
at java.io.RandomAccessFile.setLength(Native Method)
at
kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at
kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
at
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
at
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
at
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
at kafka.log.LogSegment.recover(LogSegment.scala:236)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:7
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegments(Log.scala:179)
at kafka.log.Log.(Log.scala:108)
at kafka.log.LogManager.createLog(LogManager.scala:362)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
at
kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
at
kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
at kafka.cluster.Partition.makeLeader(Partition.scala:168)
at
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:740)
at
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:739)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:739)
at
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:685)
at
kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)

-- 
 Regards,
Vani Reddy


Kafka Connect behaves weird in case of zombie Kafka brokers. Also, zombie brokers?

2017-03-21 Thread Anish Mashankar
Hello everyone,
We are running a 5 broker Kafka v0.10.0.0 cluster on AWS. Also, the connect
api is in v0.10.0.0.
It was observed that the distributed kafka connector went into infinite
loop of log message of

(Re-)joining group connect-connect-elasticsearch-indexer.

And after a little more digging. There was another infinite loop of set of
log messages

*Discovered coordinator 1.2.3.4:9092 (id:  rack: null) for group x*

*Marking the coordinator 1.2.3.4:9092  (id:  rack:
null) dead for group x*

Restarting Kafka connect did not help.

Looking at zookeeper, we realized that broker 1.2.3.4 had left the
zookeeper cluster. It had happened due to a timeout when interacting with
zookeeper. The broker was also the controller. Failover of controller
happened, and the applications were fine, but few days later, we started
facing the above mentioned issue. To add to the surprise, the kafka daemon
was still running on the host but was not trying to contact zookeeper in
any time. Hence, zombie broker.

Also, a connect cluster spread across multiple hosts was not working,
however a single connector worked.

After replacing the EC2 instance for the broker 1.2.3.4, kafka connect
cluster started working fine. (same broker ID)

I am not sure if this is a bug. Kafka connect shouldn't be trying the same
broker if it is not able establish connection. We use consul for service
discovery. As broker was running and 9092 port was active, consul reported
it was working and redirected dns queries to that broker when the connector
tried to connect. We have disabled DNS caching in the java options, and
Kafka connect should've tried to connect to some other host each time it
tried to query, but instead it only tried on 1.2.3.4 broker.

Does kafka connect internally cache DNS? Is there a debugging step I am
missing here?
-- 
Anish Samir Mashankar


Re: Consumers not rebalancing after replication

2017-03-21 Thread Mahendra Kariya
I would like to add one more thing. The logs on the consumers looks fine.
We see no errors whatsoever. The log level is set INFO. What are we missing
here? Is there some other config that we need to set so that we at least
see some errors for this in the logs?

On Wed, Mar 22, 2017 at 9:14 AM, Mahendra Kariya  wrote:

> Hey All,
>
> We have six consumers in a consumer group. At times, some of the
> partitions are under replicated for a while (maybe, 2 mis). During this
> time, the consumers subscribed to such partitions stops getting data from
> Kafka and they become inactive for a while. But when the partitions are
> fully replicated again, only the active consumers gets rebalanced. The
> inactive ones continue to remain inactive.
>
> What could be causing this? How to fix this issue?
>
>
>
>


Re: Error while starting kafka server in Windows

2017-03-21 Thread Manikumar
looks like you are running kafka on Windows OS.  There may be some issues
running kafka on windows, not sure.
you can try on Linux.

On Wed, Mar 22, 2017 at 9:24 AM, vani reddy  wrote:

> Hi,  I am getting below error:I even restarted my system, but still the
> same error.
> Iam using the version kafka_2.11-0.10.1.0
>
> java.io.IOException: The requested operation cannot be performed on a file
> with a user-mapped section o
> at java.io.RandomAccessFile.setLength(Native Method)
> at
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
> at
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
> at
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.
> apply$mcV$sp(AbstractIndex.scala:160)
> at
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.
> apply(AbstractIndex.scala:160)
> at
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.
> apply(AbstractIndex.scala:160)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.
> scala:159)
> at kafka.log.LogSegment.recover(LogSegment.scala:236)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
> TraversableLike.scala:7
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.
> scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:
> 732)
> at kafka.log.Log.loadSegments(Log.scala:179)
> at kafka.log.Log.(Log.scala:108)
> at kafka.log.LogManager.createLog(LogManager.scala:362)
> at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
> at
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.
> apply(Partition.scala:174)
> at
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.
> apply(Partition.scala:174)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
> at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
> at kafka.cluster.Partition.makeLeader(Partition.scala:168)
> at
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(
> ReplicaManager.scala:740)
> at
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(
> ReplicaManager.scala:739)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.
> apply(HashMap.scala:99)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.
> apply(HashMap.scala:99)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.
> scala:739)
> at
> kafka.server.ReplicaManager.becomeLeaderOrFollower(
> ReplicaManager.scala:685)
> at
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
>
> --
>  Regards,
> Vani Reddy
>