Re: Is this a decent use case for Kafka Streams?

2017-07-13 Thread Jon Yeargers
Unf this notion isn't applicable: "...At the end of a time window..."

If you comb through the archives of this group you'll see many questions
about notifications for the 'end of an aggregation window' and a similar
number of replies from the Kafka group stating that such a notion doesn't
really exist. Each window is kept open so that late arriving records can be
incorporated. You can specify the lifetime of a given window but you don't
get any sort of signal when it expires. A record that arrives after said
expiration will trigger a new window to be created.


On Wed, Jul 12, 2017 at 5:06 PM, Stephen Powis 
wrote:

> Hey! I was hoping I could get some input from people more experienced with
> Kafka Streams to determine if they'd be a good use case/solution for me.
>
> I have multi-tenant clients submitting data to a Kafka topic that they want
> ETL'd to a third party service.  I'd like to batch and group these by
> tenant over a time window, somewhere between 1 and 5 minutes.  At the end
> of a time window then issue an API request to the third party service for
> each tenant sending the batch of data over.
>
> Other points of note:
> - Ideally we'd have exactly-once semantics, sending data multiple times
> would typically be bad.  But we'd need to gracefully handle things like API
> request errors / service outages.
>
> - We currently use Storm for doing stream processing, but the long running
> time-windows and potentially large amount of data stored in memory make me
> a bit nervous to use it for this.
>
> Thoughts?  Thanks in Advance!
> Stephen
>


network errors?

2017-06-08 Thread Jon Yeargers
What do these messages mean:

WARN  kafka.network.Processor  - Attempting to send response via channel
for which there is no open connection, connection id 2


"... since it is no longer fetchable"

2017-05-25 Thread Jon Yeargers
Attempting to run a KStream app and seeing lots of this sort of error
message:


> Resuming partition -#
> Pausing partition -#
> Not returning fetched records for assigned partition -# since it
is no longer fetchable

This cycles through all the partitions. It seems to get _some_ data from
the topic but clearly it's struggling. I've tried restarting each broker in
sequence and the logs aren't showing anything abnormal.

Using **kafkacat** I can see that there is lots of data available.


joining two windowed aggregations

2017-05-03 Thread Jon Yeargers
I want to collect data in two windowed groups - 4 hours with a one hour
overlap and a 5 minute / 1 minute. I want to compare the values in the
_oldest_ window for each group.

Seems like this would be a standard join operation but Im not clear on how
to limit which window the join operates on. I could keep a timestamp in
each aggregate and if it isn't what I want (IE < 4 hours old) then ignore
the join but this seems v inefficient.

Likely Im missing the big-picture here again w/re KStreams. I keep running
into situations where it seems like Kafka Streams would be a great tool but
it just doesn't quite fit. Kind of like having a drawer with mixed
metric/std wrenches.


kafka consumers as kubernetes pods - best practice

2017-05-01 Thread Jon Yeargers
Im looking for suggestions as to how to manage k-consumers when they are
run as kubernetes pods - especially in an auto-scaling environment. Looking
at the output of our logging it seems like we're dropping data when a pod
is moved between hosts despite doing (what I believe is) an orderly
shutdown.

Im seeing values for consumer lag drop precipitously when a pod is
restarted - as if it were committing an offset at the end of the queue
instead of where it should have left off / picked up.

This is an investigation-in-progress. Not entirely sure what's happening
here - it just looks wrong.


Joining on non-keyed values - how to lookup fields

2017-04-20 Thread Jon Yeargers
Id like to further my immersion in kafka-as-database by doing more
extensive key/val joins. Specifically there are many instances in the DB
world where one is given a numeric field and needs to lookup the
appropriate string translation / value. Imagine a record of student/class
data where al the courses are numbered and one must determine class /
instructor names for a hard copy.

Something akin to

select  from schedules
   left join classes on schedules.classid = classes.id
   left join teachers on schedules.teacherid = teachers.id
   left join textbooks on schedules.textbookid = textbooks.id

... and so on.

In the KTable world (AFIACT) this is only possible for the key the source
record uses. So called "internal" values can't be looked up. I could
imagine running each record through a 'map' cycle to rearrange the key for
each lookup column, remap and repeat but this seems a bit onerous. Perhaps
using a Process step one could use additional streams? Dunno.

Using log-compaction these 'matching/lookup' topics could be kept available.

Was reading this missive (
https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins).
Seems like the right direction but misses this point.

Any thoughts on this? Am I missing an obvious solution? (I hope so - this
would be a cool use case)


Re: Understanding ReadOnlyWindowStore.fetch

2017-03-29 Thread Jon Yeargers
I remain more than mystified about the workings of the StateStore. I tried
making aggregations with a 1minute window, 10 second advance and a _12
hour_ retention (which is longer than the retention.ms of the topic).  I
still couldn't get more than a 15% hit rate on the StateStore.

Are there configuration settings? Some properties file to setup RocksDB? Im
not getting any errors - just not getting any data.

On Wed, Mar 29, 2017 at 12:52 PM, Jon Yeargers 
wrote:

> So '.until()' is based on clock time / elapsed time (IE record age) /
> something else?
>
> The fact that Im seeing lots of records come through that can't be found
> in the Store - these are 'old' and already expired?
>
> Going forward - it would be useful to have different forms of '.until()'
> so one could consume old records (EG if one was catching up from lag)
> without having to worry about them immediately disappearing.
>
> On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy  wrote:
>
>> Jon,
>>
>> You should be able to query anything that has not expired, i.e., based on
>> TimeWindows.until(..).
>>
>> Thanks,
>> Damian
>>
>> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers 
>> wrote:
>>
>> > To be a bit more specific:
>> >
>> > If I call this: KTable kt =
>> > sourceStream.groupByKey().reduce(..., "somekeystore");
>> >
>> > and then call this:
>> >
>> > kt.forEach()-> ...
>> >
>> > Can I assume that everything that comes out will be available in
>> > "somekeystore"? If not, what subset should I expect to find there?
>> >
>> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers > >
>> > wrote:
>> >
>> > > But if a key shows up in a KTable->forEach should it be available in
>> the
>> > > StateStore (from the KTable)?
>> > >
>> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll 
>> > > wrote:
>> > >
>> > >> Jon,
>> > >>
>> > >> there's a related example, using a window store and a key-value
>> store,
>> > at
>> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> > >> streams/src/test/java/io/confluent/examples/streams/Val
>> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
>> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
>> > >>
>> > >> -Michael
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
>> jon.yearg...@cedexis.com
>> > >
>> > >> wrote:
>> > >>
>> > >> > Im only running one instance (locally) to keep things simple.
>> > >> >
>> > >> > Reduction:
>> > >> >
>> > >> > KTable, String> hourAggStore =
>> > >> > sourceStream.groupByKey().reduce(rowReducer,
>> > >> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
>> > >> > 1000).until(70 * 60 * 1000L),
>> > >> > "HourAggStore");
>> > >> >
>> > >> > then I get values to look for via:
>> > >> >
>> > >> > hourAggStore.foreach((k, v) -> {
>> > >> > LogLine logLine = objectMapper.readValue(v,
>> > >> logLine.class);
>> > >> > LOGGER.debug("{}", k.key());
>> > >> > });
>> > >> >
>> > >> > Ive kept it easy by requesting everything from 0 to
>> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from
>> your
>> > >> > sample code "windowedByKey".
>> > >> >
>> > >> > Requests are sent in via curl and output through the same channel.
>> I
>> > >> pass
>> > >> > in the key and ask for any values.
>> > >> >
>> > >> > Ive looked at the values passed in / out of the reduction function
>> and
>> > >> they
>> > >> > look sane.
>> > >> >
>> > >> > My assumption is that if a value shows up in the 'forEach' loop
>> this
>> > >> > implies it exists in the StateStore. Accurate?
>> > >> >
>> > >> > In fact, only about one in 10 requests actual

Re: Understanding ReadOnlyWindowStore.fetch

2017-03-29 Thread Jon Yeargers
So '.until()' is based on clock time / elapsed time (IE record age) /
something else?

The fact that Im seeing lots of records come through that can't be found in
the Store - these are 'old' and already expired?

Going forward - it would be useful to have different forms of '.until()' so
one could consume old records (EG if one was catching up from lag) without
having to worry about them immediately disappearing.

On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy  wrote:

> Jon,
>
> You should be able to query anything that has not expired, i.e., based on
> TimeWindows.until(..).
>
> Thanks,
> Damian
>
> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers 
> wrote:
>
> > To be a bit more specific:
> >
> > If I call this: KTable kt =
> > sourceStream.groupByKey().reduce(..., "somekeystore");
> >
> > and then call this:
> >
> > kt.forEach()-> ...
> >
> > Can I assume that everything that comes out will be available in
> > "somekeystore"? If not, what subset should I expect to find there?
> >
> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers 
> > wrote:
> >
> > > But if a key shows up in a KTable->forEach should it be available in
> the
> > > StateStore (from the KTable)?
> > >
> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll 
> > > wrote:
> > >
> > >> Jon,
> > >>
> > >> there's a related example, using a window store and a key-value store,
> > at
> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> > >> streams/src/test/java/io/confluent/examples/streams/Val
> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
> > >>
> > >> -Michael
> > >>
> > >>
> > >>
> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
> jon.yearg...@cedexis.com
> > >
> > >> wrote:
> > >>
> > >> > Im only running one instance (locally) to keep things simple.
> > >> >
> > >> > Reduction:
> > >> >
> > >> > KTable, String> hourAggStore =
> > >> > sourceStream.groupByKey().reduce(rowReducer,
> > >> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> > >> > 1000).until(70 * 60 * 1000L),
> > >> > "HourAggStore");
> > >> >
> > >> > then I get values to look for via:
> > >> >
> > >> > hourAggStore.foreach((k, v) -> {
> > >> > LogLine logLine = objectMapper.readValue(v,
> > >> logLine.class);
> > >> > LOGGER.debug("{}", k.key());
> > >> > });
> > >> >
> > >> > Ive kept it easy by requesting everything from 0 to
> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from
> your
> > >> > sample code "windowedByKey".
> > >> >
> > >> > Requests are sent in via curl and output through the same channel. I
> > >> pass
> > >> > in the key and ask for any values.
> > >> >
> > >> > Ive looked at the values passed in / out of the reduction function
> and
> > >> they
> > >> > look sane.
> > >> >
> > >> > My assumption is that if a value shows up in the 'forEach' loop this
> > >> > implies it exists in the StateStore. Accurate?
> > >> >
> > >> > In fact, only about one in 10 requests actually return any values.
> No
> > >> > errors - just no data.
> > >> >
> > >> >
> > >> >
> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy 
> > >> wrote:
> > >> >
> > >> > > Hi Jon,
> > >> > >
> > >> > > If you are able to get a handle on the store, i.e., via
> > >> > > KafkaStreams.store(...) and call fetch without any exceptions,
> then
> > >> the
> > >> > > store is available.
> > >> > > The time params to fetch are the boundaries to search for windows
> > for
> > >> the
> > >> > > given key. They relate to the start time of the window, so if you
> > did
> > >> > > fetch(key, t1, t2) - it will find all the windows for 

Re: Understanding ReadOnlyWindowStore.fetch

2017-03-29 Thread Jon Yeargers
To be a bit more specific:

If I call this: KTable kt =
sourceStream.groupByKey().reduce(..., "somekeystore");

and then call this:

kt.forEach()-> ...

Can I assume that everything that comes out will be available in
"somekeystore"? If not, what subset should I expect to find there?

On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers 
wrote:

> But if a key shows up in a KTable->forEach should it be available in the
> StateStore (from the KTable)?
>
> On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll 
> wrote:
>
>> Jon,
>>
>> there's a related example, using a window store and a key-value store, at
>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> streams/src/test/java/io/confluent/examples/streams/Val
>> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
>> (this is for Confluent 3.2 / Kafka 0.10.2).
>>
>> -Michael
>>
>>
>>
>> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers 
>> wrote:
>>
>> > Im only running one instance (locally) to keep things simple.
>> >
>> > Reduction:
>> >
>> > KTable, String> hourAggStore =
>> > sourceStream.groupByKey().reduce(rowReducer,
>> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
>> > 1000).until(70 * 60 * 1000L),
>> > "HourAggStore");
>> >
>> > then I get values to look for via:
>> >
>> > hourAggStore.foreach((k, v) -> {
>> > LogLine logLine = objectMapper.readValue(v,
>> logLine.class);
>> > LOGGER.debug("{}", k.key());
>> > });
>> >
>> > Ive kept it easy by requesting everything from 0 to
>> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your
>> > sample code "windowedByKey".
>> >
>> > Requests are sent in via curl and output through the same channel. I
>> pass
>> > in the key and ask for any values.
>> >
>> > Ive looked at the values passed in / out of the reduction function and
>> they
>> > look sane.
>> >
>> > My assumption is that if a value shows up in the 'forEach' loop this
>> > implies it exists in the StateStore. Accurate?
>> >
>> > In fact, only about one in 10 requests actually return any values. No
>> > errors - just no data.
>> >
>> >
>> >
>> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy 
>> wrote:
>> >
>> > > Hi Jon,
>> > >
>> > > If you are able to get a handle on the store, i.e., via
>> > > KafkaStreams.store(...) and call fetch without any exceptions, then
>> the
>> > > store is available.
>> > > The time params to fetch are the boundaries to search for windows for
>> the
>> > > given key. They relate to the start time of the window, so if you did
>> > > fetch(key, t1, t2) - it will find all the windows for key that start
>> in
>> > the
>> > > inclusive time range t1 - t2.
>> > >
>> > > Are you running more than one instance? If yes, then you want to make
>> > sure
>> > > that you are querying the correct instance. For that you can use:
>> > > KafkaStreams.metadataForKey(...) to find the instance that has the
>> key
>> > you
>> > > are looking for.
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > >
>> > >
>> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers 
>> > > wrote:
>> > >
>> > > > Im probing about trying to find a way to solve my aggregation -> db
>> > > issue.
>> > > > Looking at the '.fetch()'  function Im wondering about the
>> 'timeFrom'
>> > and
>> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
>> > > >
>> > > > The test in
>> > > >
>> > > > https://github.com/confluentinc/examples/blob/
>> > > master/kafka-streams/src/test/java/io/confluent/examples/
>> > > streams/interactivequeries/WordCountInteractiveQueriesExa
>> > > mpleTest.java#L200-L212
>> > > > makes it appear that the params are boundaries and that it will
>> return
>> > an
>> > > > inclusive list of every key/window combination. Truth?
>> > > >
>> > > > My tests to this end haven't returned anything.
>> > > >
>> > > > Im watching the values coming out of the KTable so I
>> > can
>> > > > send them back as request params. What Ive tried:
>> > > >
>> > > > - Window.key(), Window.key().start() and Window.key().end()
>> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end()
>> + 1)
>> > > > - Window.key(), 0 and Window.key().end()
>> > > > - Window.key(), 0 and (Window.key().end() + 1)
>> > > >
>> > > > None of these seem to hit anything in the StateStore.
>> > > >
>> > > > Is there a delay before Store values become available for
>> '.fetch()'?
>> > > >
>> > >
>> >
>>
>
>


Re: Understanding ReadOnlyWindowStore.fetch

2017-03-29 Thread Jon Yeargers
But if a key shows up in a KTable->forEach should it be available in the
StateStore (from the KTable)?

On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll  wrote:

> Jon,
>
> there's a related example, using a window store and a key-value store, at
> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java
> (this is for Confluent 3.2 / Kafka 0.10.2).
>
> -Michael
>
>
>
> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers 
> wrote:
>
> > Im only running one instance (locally) to keep things simple.
> >
> > Reduction:
> >
> > KTable, String> hourAggStore =
> > sourceStream.groupByKey().reduce(rowReducer,
> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> > 1000).until(70 * 60 * 1000L),
> > "HourAggStore");
> >
> > then I get values to look for via:
> >
> > hourAggStore.foreach((k, v) -> {
> > LogLine logLine = objectMapper.readValue(v,
> logLine.class);
> > LOGGER.debug("{}", k.key());
> > });
> >
> > Ive kept it easy by requesting everything from 0 to
> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your
> > sample code "windowedByKey".
> >
> > Requests are sent in via curl and output through the same channel. I pass
> > in the key and ask for any values.
> >
> > Ive looked at the values passed in / out of the reduction function and
> they
> > look sane.
> >
> > My assumption is that if a value shows up in the 'forEach' loop this
> > implies it exists in the StateStore. Accurate?
> >
> > In fact, only about one in 10 requests actually return any values. No
> > errors - just no data.
> >
> >
> >
> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy 
> wrote:
> >
> > > Hi Jon,
> > >
> > > If you are able to get a handle on the store, i.e., via
> > > KafkaStreams.store(...) and call fetch without any exceptions, then the
> > > store is available.
> > > The time params to fetch are the boundaries to search for windows for
> the
> > > given key. They relate to the start time of the window, so if you did
> > > fetch(key, t1, t2) - it will find all the windows for key that start in
> > the
> > > inclusive time range t1 - t2.
> > >
> > > Are you running more than one instance? If yes, then you want to make
> > sure
> > > that you are querying the correct instance. For that you can use:
> > > KafkaStreams.metadataForKey(...) to find the instance that has the key
> > you
> > > are looking for.
> > >
> > > Thanks,
> > > Damian
> > >
> > >
> > >
> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers 
> > > wrote:
> > >
> > > > Im probing about trying to find a way to solve my aggregation -> db
> > > issue.
> > > > Looking at the '.fetch()'  function Im wondering about the 'timeFrom'
> > and
> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> > > >
> > > > The test in
> > > >
> > > > https://github.com/confluentinc/examples/blob/
> > > master/kafka-streams/src/test/java/io/confluent/examples/
> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> > > mpleTest.java#L200-L212
> > > > makes it appear that the params are boundaries and that it will
> return
> > an
> > > > inclusive list of every key/window combination. Truth?
> > > >
> > > > My tests to this end haven't returned anything.
> > > >
> > > > Im watching the values coming out of the KTable so I
> > can
> > > > send them back as request params. What Ive tried:
> > > >
> > > > - Window.key(), Window.key().start() and Window.key().end()
> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() +
> 1)
> > > > - Window.key(), 0 and Window.key().end()
> > > > - Window.key(), 0 and (Window.key().end() + 1)
> > > >
> > > > None of these seem to hit anything in the StateStore.
> > > >
> > > > Is there a delay before Store values become available for '.fetch()'?
> > > >
> > >
> >
>


Re: Understanding ReadOnlyWindowStore.fetch

2017-03-29 Thread Jon Yeargers
Im only running one instance (locally) to keep things simple.

Reduction:

KTable, String> hourAggStore =
sourceStream.groupByKey().reduce(rowReducer,
TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
1000).until(70 * 60 * 1000L),
"HourAggStore");

then I get values to look for via:

hourAggStore.foreach((k, v) -> {
LogLine logLine = objectMapper.readValue(v, logLine.class);
LOGGER.debug("{}", k.key());
});

Ive kept it easy by requesting everything from 0 to
'System.currentTimeMillis()'. Retrieval is done using a snip from your
sample code "windowedByKey".

Requests are sent in via curl and output through the same channel. I pass
in the key and ask for any values.

Ive looked at the values passed in / out of the reduction function and they
look sane.

My assumption is that if a value shows up in the 'forEach' loop this
implies it exists in the StateStore. Accurate?

In fact, only about one in 10 requests actually return any values. No
errors - just no data.



On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy  wrote:

> Hi Jon,
>
> If you are able to get a handle on the store, i.e., via
> KafkaStreams.store(...) and call fetch without any exceptions, then the
> store is available.
> The time params to fetch are the boundaries to search for windows for the
> given key. They relate to the start time of the window, so if you did
> fetch(key, t1, t2) - it will find all the windows for key that start in the
> inclusive time range t1 - t2.
>
> Are you running more than one instance? If yes, then you want to make sure
> that you are querying the correct instance. For that you can use:
> KafkaStreams.metadataForKey(...) to find the instance that has the key you
> are looking for.
>
> Thanks,
> Damian
>
>
>
> On Tue, 28 Mar 2017 at 22:37 Jon Yeargers 
> wrote:
>
> > Im probing about trying to find a way to solve my aggregation -> db
> issue.
> > Looking at the '.fetch()'  function Im wondering about the 'timeFrom' and
> > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> >
> > The test in
> >
> > https://github.com/confluentinc/examples/blob/
> master/kafka-streams/src/test/java/io/confluent/examples/
> streams/interactivequeries/WordCountInteractiveQueriesExa
> mpleTest.java#L200-L212
> > makes it appear that the params are boundaries and that it will return an
> > inclusive list of every key/window combination. Truth?
> >
> > My tests to this end haven't returned anything.
> >
> > Im watching the values coming out of the KTable so I can
> > send them back as request params. What Ive tried:
> >
> > - Window.key(), Window.key().start() and Window.key().end()
> > - Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1)
> > - Window.key(), 0 and Window.key().end()
> > - Window.key(), 0 and (Window.key().end() + 1)
> >
> > None of these seem to hit anything in the StateStore.
> >
> > Is there a delay before Store values become available for '.fetch()'?
> >
>


Understanding ReadOnlyWindowStore.fetch

2017-03-28 Thread Jon Yeargers
Im probing about trying to find a way to solve my aggregation -> db issue.
Looking at the '.fetch()'  function Im wondering about the 'timeFrom' and
'timeTo' params as not a lot is mentioned about 'proper' usage.

The test in
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java#L200-L212
makes it appear that the params are boundaries and that it will return an
inclusive list of every key/window combination. Truth?

My tests to this end haven't returned anything.

Im watching the values coming out of the KTable so I can
send them back as request params. What Ive tried:

- Window.key(), Window.key().start() and Window.key().end()
- Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1)
- Window.key(), 0 and Window.key().end()
- Window.key(), 0 and (Window.key().end() + 1)

None of these seem to hit anything in the StateStore.

Is there a delay before Store values become available for '.fetch()'?


WindowStore and retention

2017-03-28 Thread Jon Yeargers
How long does a given value persist in a WindowStore? Does it obey the
'.until()' param of a windowed aggregation/ reduction?

Please say yes.


using a state store for deduplication

2017-03-27 Thread Jon Yeargers
Ive been (re)reading this document(
http://docs.confluent.io/3.2.0/streams/developer-guide.html#state-stores)
hoping to better understand StateStores. At the top of the section there is
a tantalizing note implying that one could do deduplication using a store.

At present we using Redis for this as it gives us a shared location. Ive
been of the mind that a given store was local to a streams instance. To
truly support deduplication I would think one would need access to _all_
the data for a topic and not just on a per-partition basis.

Am I completely misunderstanding this?


Iterating stream windows

2017-03-27 Thread Jon Yeargers
Im hoping to support external queries into a windowed state store
aggregator. Thanks to a previous question here I see where to use a
ReadOnlyWindowStore but Im not clear on how to define the boundaries for
the call.

Assumie I have a one hour window with a 5 minute 'slide' between new
windows. If an arbitrary request for the 'latest' values comes in I want to
return the window thats closest to - but not outside - its 60 minute
boundary. To me this implies I need to iterate over available windows
(those that haven't hit their 'retention' value).

Does such a function exist? I can envision a sol'n using a guava timed
cache but Im trying v hard not to break out of the full-kafka world.


Re: YASSQ (yet another state store question)

2017-03-26 Thread Jon Yeargers
Also - if I run this on two hosts - what does it imply if the response to
'streams.allMetadata()' from one host includes both instances but the other
host only knows about itself?

On Sun, Mar 26, 2017 at 5:58 AM, Jon Yeargers 
wrote:

> If the '.state()' function returns "RUNNING" and I still get this
> exception?
>
> On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska 
> wrote:
>
>> Hi Jon,
>>
>> This is expected, see this: https://groups.google.com/foru
>> m/?pli=1#!searchin/confluent-platform/migrated$20to$
>> 20another$20instance%7Csort:relevance/confluent-platform/
>> LglWC_dZDKw/qsPuCRT_DQAJ <https://groups.google.com/for
>> um/?pli=1#!searchin/confluent-platform/migrated$20to$
>> 20another$20instance|sort:relevance/confluent-platform/
>> LglWC_dZDKw/qsPuCRT_DQAJ>.
>>
>> Thanks
>> Eno
>> > On 24 Mar 2017, at 20:51, Jon Yeargers 
>> wrote:
>> >
>> > I've setup a KTable as follows:
>> >
>> > KTable, String> outTable = sourceStream.groupByKey().
>> > reduce(rowReducer,
>> >TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
>> > 1000).until(10 * 60 * 1000L),
>> >"AggStore");
>> >
>> > I can confirm its presence via 'streams.allMetadata()' (accessible
>> through
>> > a simple httpserver).
>> >
>> > When I call 'ReadOnlyKeyValueStore store =
>> > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'
>> >
>> > I get this exception:
>> >
>> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
>> > store, AggStore, may have migrated to another instance.
>> >at
>> > org.apache.kafka.streams.state.internals.QueryableStoreProvi
>> der.getStore(QueryableStoreProvider.java:49)
>> >at
>> > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
>> >at
>> > com.cedexis.videokafka.videohouraggregator.RequestHandler.
>> handle(RequestHandler.java:97)
>> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>> >at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
>> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
>> >at
>> > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Se
>> rverImpl.java:675)
>> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>> >at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:6
>> 47)
>> >at
>> > sun.net.httpserver.ServerImpl$DefaultExecutor.execute(Server
>> Impl.java:158)
>> >at
>> > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
>> >at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java
>> :396)
>> >at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > ... except.. there is only one instance.. running locally.
>>
>>
>


Re: YASSQ (yet another state store question)

2017-03-26 Thread Jon Yeargers
If the '.state()' function returns "RUNNING" and I still get this exception?

On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska 
wrote:

> Hi Jon,
>
> This is expected, see this: https://groups.google.com/
> forum/?pli=1#!searchin/confluent-platform/migrated$
> 20to$20another$20instance%7Csort:relevance/confluent-
> platform/LglWC_dZDKw/qsPuCRT_DQAJ <https://groups.google.com/
> forum/?pli=1#!searchin/confluent-platform/migrated$
> 20to$20another$20instance|sort:relevance/confluent-
> platform/LglWC_dZDKw/qsPuCRT_DQAJ>.
>
> Thanks
> Eno
> > On 24 Mar 2017, at 20:51, Jon Yeargers  wrote:
> >
> > I've setup a KTable as follows:
> >
> > KTable, String> outTable = sourceStream.groupByKey().
> > reduce(rowReducer,
> >TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
> > 1000).until(10 * 60 * 1000L),
> >"AggStore");
> >
> > I can confirm its presence via 'streams.allMetadata()' (accessible
> through
> > a simple httpserver).
> >
> > When I call 'ReadOnlyKeyValueStore store =
> > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'
> >
> > I get this exception:
> >
> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> > store, AggStore, may have migrated to another instance.
> >at
> > org.apache.kafka.streams.state.internals.QueryableStoreProvider.
> getStore(QueryableStoreProvider.java:49)
> >at
> > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
> >at
> > com.cedexis.videokafka.videohouraggregator.RequestHandler.handle(
> RequestHandler.java:97)
> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
> >at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
> >at
> > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(
> ServerImpl.java:675)
> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
> >at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:
> 647)
> >at
> > sun.net.httpserver.ServerImpl$DefaultExecutor.execute(
> ServerImpl.java:158)
> >at
> > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
> >at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.
> java:396)
> >at java.lang.Thread.run(Thread.java:745)
> >
> >
> > ... except.. there is only one instance.. running locally.
>
>


YASSQ (yet another state store question)

2017-03-24 Thread Jon Yeargers
I've setup a KTable as follows:

KTable, String> outTable = sourceStream.groupByKey().
reduce(rowReducer,
TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
1000).until(10 * 60 * 1000L),
"AggStore");

I can confirm its presence via 'streams.allMetadata()' (accessible through
a simple httpserver).

When I call 'ReadOnlyKeyValueStore store =
kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'

I get this exception:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state
store, AggStore, may have migrated to another instance.
at
org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:49)
at
org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
at
com.cedexis.videokafka.videohouraggregator.RequestHandler.handle(RequestHandler.java:97)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
at
sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
at
sun.net.httpserver.ServerImpl$DefaultExecutor.execute(ServerImpl.java:158)
at
sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java:396)
at java.lang.Thread.run(Thread.java:745)


... except.. there is only one instance.. running locally.


Re: APPLICATION_SERVER_CONFIG ?

2017-03-24 Thread Jon Yeargers
You make some great cases for your architecture. To be clear - Ive been
proselytizing for kafka since I joined this company last year. I think my
largest issue is rethinking some preexisting notions about streaming to
make them work in the kstream universe.

On Fri, Mar 24, 2017 at 6:07 AM, Michael Noll  wrote:

> > If I understand this correctly: assuming I have a simple aggregator
> > distributed across n-docker instances each instance will _also_ need to
> > support some sort of communications process for allowing access to its
> > statestore (last param from KStream.groupby.aggregate).
>
> Yes.
>
> See
> http://docs.confluent.io/current/streams/developer-
> guide.html#your-application-and-interactive-queries
> .
>
> > - The tombstoning facilities of redis or C* would lend themselves well to
> > implementing a 'true' rolling aggregation
>
> What is a 'true' rolling aggregation, and how could Redis or C* help with
> that in a way that Kafka can't?  (Honest question.)
>
>
> > I get that RocksDB has a small footprint but given the choice of
> > implementing my own RPC / gossip-like process for data sharing and using
> a
> > well tested one (ala C* or redis) I would almost always opt for the
> latter.
> > [...]
> > Just my $0.02. I would love to hear why Im missing the 'big picture'. The
> > kstreams architecture seems rife with potential.
>
> One question is, for example:  can the remote/central DB of your choice
> (Redis, C*) handle as many qps as Kafka/Kafka Streams/RocksDB can handle?
> Over the network?  At the same low latency?  Also, what happens if the
> remote DB is unavailable?  Do you wait and retry?  Discard?  Accept the
> fact that your app's processing latency will now go through the roof?  I
> wrote about some such scenarios at
> https://www.confluent.io/blog/distributed-real-time-joins-
> and-aggregations-on-user-activity-events-using-kafka-streams/
> .
>
> One big advantage (for many use cases, not for) with Kafka/Kafka Streams is
> that you can leverage fault-tolerant *local* state that may also be
> distributed across app instances.  Local state is much more efficient and
> faster when doing stateful processing such as joins or aggregations.  You
> don't need to worry about an external system, whether it's up and running,
> whether its version is still compatible with your app, whether it can scale
> as much as your app/Kafka Streams/Kafka/the volume of your input data.
>
> Also, note that some users have actually opted to run hybrid setups:  Some
> processing output is sent to a remote data store like Cassandra (e.g. via
> Kafka Connect), some processing output is exposed directly through
> interactive queries.  It's not like your forced to pick only one approach.
>
>
> > - Typical microservices would separate storing / retrieving data
>
> I'd rather argue that for microservices you'd oftentimes prefer to *not*
> use a remote DB, and rather do everything inside your microservice whatever
> the microservice needs to do (perhaps we could relax this to "do everything
> in a way that your microservices is in full, exclusive control", i.e. it
> doesn't necessarily need to be *inside*, but arguably it would be better if
> it actually is).
> See e.g. the article
> https://www.confluent.io/blog/data-dichotomy-rethinking-the-
> way-we-treat-data-and-services/
> that lists some of the reasoning behind this school of thinking.  Again,
> YMMV.
>
> Personally, I think there's no simple true/false here.  The decisions
> depend on what you need, what your context is, etc.  Anyways, since you
> already have some opinions for the one side, I wanted to share some food
> for thought for the other side of the argument. :-)
>
> Best,
> Michael
>
>
>
>
>
> On Fri, Mar 24, 2017 at 1:25 PM, Jon Yeargers 
> wrote:
>
> > If I understand this correctly: assuming I have a simple aggregator
> > distributed across n-docker instances each instance will _also_ need to
> > support some sort of communications process for allowing access to its
> > statestore (last param from KStream.groupby.aggregate).
> >
> > How would one go about substituting a separated db (EG redis) for the
> > statestore?
> >
> > Some advantages to decoupling:
> > - It would seem like having a centralized process like this would
> alleviate
> > the need to execute multiple requests for a given kv pair (IE "who has
> this
> > data?" and subsequent requests to retrieve it).
> > - it would take some pressure off of each node to maintain a large disk
> > store
> > - Typical microservices

Re: APPLICATION_SERVER_CONFIG ?

2017-03-24 Thread Jon Yeargers
If I understand this correctly: assuming I have a simple aggregator
distributed across n-docker instances each instance will _also_ need to
support some sort of communications process for allowing access to its
statestore (last param from KStream.groupby.aggregate).

How would one go about substituting a separated db (EG redis) for the
statestore?

Some advantages to decoupling:
- It would seem like having a centralized process like this would alleviate
the need to execute multiple requests for a given kv pair (IE "who has this
data?" and subsequent requests to retrieve it).
- it would take some pressure off of each node to maintain a large disk
store
- Typical microservices would separate storing / retrieving data
- It would raise some eyebrows if a spec called for a mysql/nosql instance
to be installed with every docker container
- The tombstoning facilities of redis or C* would lend themselves well to
implementing a 'true' rolling aggregation

I get that RocksDB has a small footprint but given the choice of
implementing my own RPC / gossip-like process for data sharing and using a
well tested one (ala C* or redis) I would almost always opt for the latter.
(Footnote: Our implementations already heavily use redis/memcached for
deduplication of kafka messages so it would seem a small step to use the
same to store aggregation results.)

Just my $0.02. I would love to hear why Im missing the 'big picture'. The
kstreams architecture seems rife with potential.

On Thu, Mar 23, 2017 at 3:17 PM, Matthias J. Sax 
wrote:

> The config does not "do" anything. It's metadata that get's broadcasted
> to other Streams instances for IQ feature.
>
> See this blog post for more details:
> https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
>
> Happy to answer any follow up question.
>
>
> -Matthias
>
> On 3/23/17 11:51 AM, Jon Yeargers wrote:
> > What does this config param do?
> >
> > I see it referenced / used in some samples and here (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 67%3A+Queryable+state+for+Kafka+Streams
> > )
> >
>
>


APPLICATION_SERVER_CONFIG ?

2017-03-23 Thread Jon Yeargers
What does this config param do?

I see it referenced / used in some samples and here (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
)


Getting current value of aggregated key

2017-03-23 Thread Jon Yeargers
If I have an aggregation :

KTable, VideoLogLine> outTable =
sourceStream.groupByKey().reduce(rowReducer,
TimeWindows.of(60 * 60 * 1000L).until(10 * 60 * 1000L),
"HourAggStore");

how would I go about getting some value from this with a separate process?
I have the "HourAggStore" but Im not clear how to retrieve anything from it.


Re: clearing an aggregation?

2017-03-22 Thread Jon Yeargers
I get that the windows are aligned along seconds but this doesn't really
help with true clock alignment (IE top of the hour, midnight, etc).

I can imagine a strategy using overlapping windows. One would
(hypothetically) walk through the list until a window that spanned the
desired time was found.

Since apparently there isn't a way to iterate through Windowed KTables Im
guessing that this sort of 'aggregate and clear' approach still requires an
external datastore (like Redis). Please correct me if Im wrong.

On Mon, Mar 20, 2017 at 9:29 AM, Michael Noll  wrote:

> Jon,
>
> the windowing operation of Kafka's Streams API (in its DSL) aligns
> time-based windows to the epoch [1]:
>
> Quoting from e.g. hopping windows (sometimes called sliding windows in
> other technologies):
>
> > Hopping time windows are aligned to the epoch, with the lower interval
> bound
> > being inclusive and the upper bound being exclusive. “Aligned to the
> epoch”
> > means that the first window starts at timestamp zero.
> > For example, hopping windows with a size of 5000ms and an advance
> interval
> > (“hop”) of 3000ms have predictable window boundaries
> `[0;5000),[3000;8000),...`
> > — and not `[1000;6000),[4000;9000),...` or even something “random” like
> > `[1452;6452),[4452;9452),...`.
>
> Would that help you?
>
> -Michael
>
>
>
> [1] http://docs.confluent.io/current/streams/developer-guide.html
>
>
> On Mon, Mar 20, 2017 at 12:51 PM, Jon Yeargers 
> wrote:
>
> > Is this possible? Im wondering about gathering data from a stream into a
> > series of windowed aggregators: minute, hour and day. A separate process
> > would start at fixed intervals, query the appropriate state store for
> > available values and then hopefully clear / zero / reset everything for
> the
> > next interval.
> >
> > I could use the retention period setting but I would (somehow) need to
> > guarantee that the windows would reset on clock boundaries and not based
> on
> > start time for the app.
> >
>


clearing an aggregation?

2017-03-20 Thread Jon Yeargers
Is this possible? Im wondering about gathering data from a stream into a
series of windowed aggregators: minute, hour and day. A separate process
would start at fixed intervals, query the appropriate state store for
available values and then hopefully clear / zero / reset everything for the
next interval.

I could use the retention period setting but I would (somehow) need to
guarantee that the windows would reset on clock boundaries and not based on
start time for the app.


understanding consumer rebalance trigger(s)

2017-03-02 Thread Jon Yeargers
Im wondering what the parameters are to instantiate a consumer rebalance. I
have a topic that turns roughly 50K / minute across 6 partitions. Each is
serviced by a separate dockerized consumer.

Roughly every 8-12 min this goes into a rebalance that may take up to a
minute. When it returns it often puts some or all partitions on to a single
consumer (leaving others idle). This may persist for a minute while it
tries another arrangement. Eventually after 2-3 tries it will evenly
distribute the partitions.. for a few minutes until it does another
misguided attempt. As a result we have lag increasing from 0 to ~450K and
back to 0 on a cycle.

The data rate is assumed to be roughly consistent through these cycles.

Resultant graph of lag is a sawtooth shape.

Using 0.10.0.1
3 brokers


Also - is there some way to set / control consumer 'assignment'? Or to
'suggest' a setting?


Re: hitting the throughput limit on a cluster?

2017-02-21 Thread Jon Yeargers
Thanks for looking at this issue. I checked the max IOPs for this disk and
we're only at about 10%. I can add more disks to spread out the work.

What IOWait values should I be aiming for?

Also - what do you set openfiles to? I have it at 65535 but I just read a
doc that suggested > 100K is better


On Tue, Feb 21, 2017 at 10:45 AM, Todd Palino  wrote:

> So I think the important thing to look at here is the IO wait on your
> system. You’re hitting disk throughput issues, and that’s what you most
> likely need to resolve. So just from what you’ve described, I think the
> only thing that is going to get you more performance is more spindles (or
> faster spindles). This is either more disks or more brokers, but at the end
> of it you need to eliminate the disk IO bottleneck.
>
> -Todd
>
>
> On Tue, Feb 21, 2017 at 7:29 AM, Jon Yeargers 
> wrote:
>
> > Running 3x 8core on google compute.
> >
> > Topic has 16 partitions (replication factor 2) and is consumed by 16
> docker
> > containers on individual hosts.
> >
> > System seems to max out at around 4 messages / minute. Each message
> is
> > ~12K - compressed (snappy) JSON.
> >
> > Recently moved from 12 to the above 16 partitions with no change in
> > throughput.
> >
> > Also tried increased the consumption capacity on each container by 50%.
> No
> > effect.
> >
> > Network is running at ~6Gb/sec (measured using iperf3). Broker load is
> > ~1.5. IOWait % is 5-10 (via sar).
> >
> > What are my options for adding throughput?
> >
> > - more brokers?
> > - avro/protobuf messaging?
> > - more disks / broker? (1 / host presently)
> > - jumbo frames?
> >
> > (transparent huge pages is disabled)
> >
> >
> > Looking at this article (
> > https://engineering.linkedin.com/kafka/benchmarking-apache-
> > kafka-2-million-writes-second-three-cheap-machines)
> > it would appear that for our message size we are at the max. This would
> > argue that we need to shrink the message size - so perhaps switching to
> > avro is the next step?
> >
>
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


hitting the throughput limit on a cluster?

2017-02-21 Thread Jon Yeargers
Running 3x 8core on google compute.

Topic has 16 partitions (replication factor 2) and is consumed by 16 docker
containers on individual hosts.

System seems to max out at around 4 messages / minute. Each message is
~12K - compressed (snappy) JSON.

Recently moved from 12 to the above 16 partitions with no change in
throughput.

Also tried increased the consumption capacity on each container by 50%. No
effect.

Network is running at ~6Gb/sec (measured using iperf3). Broker load is
~1.5. IOWait % is 5-10 (via sar).

What are my options for adding throughput?

- more brokers?
- avro/protobuf messaging?
- more disks / broker? (1 / host presently)
- jumbo frames?

(transparent huge pages is disabled)


Looking at this article (
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines)
it would appear that for our message size we are at the max. This would
argue that we need to shrink the message size - so perhaps switching to
avro is the next step?


conflicts between consumer groups? seeing many duplicate records

2017-02-17 Thread Jon Yeargers
Is it possible that using the same group name for two topics could cause a
conflict?

I have a situation where Im seeing vast numbers of records (more than 2x)
get duplicated in a topic. I was looking at consumer lag using
'kafka-consumer-groups ... --new-consumer' and noticed that I had another
app using the same group name for a different (much smaller) topic.

Would increasing partitions have an effect like this?

Added 33% more partitions to a topic yesterday and all sorts of havoc
occurred.

using 0.10.1.0


Re: KTable and cleanup.policy=compact

2017-02-13 Thread Jon Yeargers
If Im doing a KStream.leftJoin(KTable) how would I set this configuration
for just the KTable portion?

IE I have

KStream = KStreamBuilder.stream()
KTable = KStreamBuilder.table()

...
(join occurs.. data flows.. ppl are brought closer together.. there is
peace in the valley.. for me... )
...

KafkaStreams = new KafkaStream(KStreamBuilder,
config_with_cleanup_policy_or_not?)
KafkaStream.start

On Wed, Feb 8, 2017 at 12:30 PM, Eno Thereska 
wrote:

> Yeah makes sense. I was looking at it from the point of view of keeping
> all data forever.
>
> Eno
>
> > On 8 Feb 2017, at 20:27, Matthias J. Sax  wrote:
> >
> > Yes, that could happen if a key was not updated for a longer period than
> > topic retention time.
> >
> > If you want to force a changelog creation, you can do a dummy aggregate
> > instead of using KStreamBuilder#table()
> >
> >
> >> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new
> Reducer() {
> >>@Override
> >>public Object apply(Object oldValue, Object newValue) {
> >>return newValue;
> >>}
> >> }, "someStoreName");
> >
> >
> > -Matthias
> >
> >
> > On 2/8/17 11:39 AM, Mathieu Fenniak wrote:
> >> I think there could be correctness implications... the default
> >> cleanup.policy of delete would mean that topic entries past the
> retention
> >> policy might have been removed.  If you scale up the application, new
> >> application instances won't be able to restore a complete table into its
> >> local state store.  An operation like a join against that KTable would
> find
> >> no records where there should be record.
> >>
> >> Mathieu
> >>
> >>
> >> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska 
> >> wrote:
> >>
> >>> If you fail to set the policy to compact, there shouldn't be any
> >>> correctness implications, however your topics will grow larger than
> >>> necessary.
> >>>
> >>> Eno
> >>>
> >>>> On 8 Feb 2017, at 18:56, Jon Yeargers 
> wrote:
> >>>>
> >>>> What are the ramifications of failing to do this?
> >>>>
> >>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Yes, that is correct.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote:
> >>>>>> Hey kafka users,
> >>>>>>
> >>>>>> Is it correct that a Kafka topic that is used for a KTable should be
> >>> set
> >>>>> to
> >>>>>> cleanup.policy=compact?
> >>>>>>
> >>>>>> I've never noticed until today that the KStreamBuilder#table()
> >>>>>> documentation says: "However, no internal changelog topic is created
> >>>>> since
> >>>>>> the original input topic can be used for recovery"... [1], which
> seems
> >>>>> like
> >>>>>> it is only true if the topic is configured for compaction.
> Otherwise
> >>> the
> >>>>>> original input topic won't necessarily contain the data necessary
> for
> >>>>>> recovery of the state store.
> >>>>>>
> >>>>>> [1]
> >>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951
> >>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/
> >>>>> kstream/KStreamBuilder.java#L355
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Mathieu
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >>
> >
>
>


Re: KTable and cleanup.policy=compact

2017-02-08 Thread Jon Yeargers
What are the ramifications of failing to do this?

On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax 
wrote:

> Yes, that is correct.
>
>
> -Matthias
>
>
> On 2/7/17 6:39 PM, Mathieu Fenniak wrote:
> > Hey kafka users,
> >
> > Is it correct that a Kafka topic that is used for a KTable should be set
> to
> > cleanup.policy=compact?
> >
> > I've never noticed until today that the KStreamBuilder#table()
> > documentation says: "However, no internal changelog topic is created
> since
> > the original input topic can be used for recovery"... [1], which seems
> like
> > it is only true if the topic is configured for compaction.  Otherwise the
> > original input topic won't necessarily contain the data necessary for
> > recovery of the state store.
> >
> > [1]
> > https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951
> 7523c83060/streams/src/main/java/org/apache/kafka/streams/
> kstream/KStreamBuilder.java#L355
> >
> > Thanks,
> >
> > Mathieu
> >
>
>


"auto offset commit failed"

2017-02-06 Thread Jon Yeargers
This message seems to come and go for various consumers:

WARN   o.a.k.c.c.i.ConsumerCoordinator - Auto offset commit failed for
group : Commit offsets failed with retriable exception. You should
retry committing offsets.

Since Im not tracking offsets - how would I go about retrying them?


Re: Understanding output of KTable->KTable join

2017-01-31 Thread Jon Yeargers
If I KStream.leftJoin(Ktable) this article (
https://www.confluent.io/blog/distributed-real-time-joins-and-aggregations-on-user-activity-events-using-kafka-streams/)
seems to suggest that I could have one:many.   (ktable:kstream)

Accurate?

On Mon, Jan 30, 2017 at 4:35 PM, Matthias J. Sax 
wrote:

> If you join two KTables, one-to-many join is currently not supported
> (only one-to-one, ie, primary key join).
>
> In upcoming 0.10.2 there will be global-KTables that allow something
> similar to one-to many joins -- however, only for KStream-GlobalKTable
> joins, so not sure if this can help you.
>
> About : yes, it indicates that there was no join computed,
> because no matching key was found. Cf.
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Join+Semantics
>
> Not sure what your keys are, the output you shared is hard to read...
> (eg., 20bebc12136be4226b29c5d1b6183d8ed2b117c5)
>
>
> We might add one-to-many KTable-GlobalKTable joins in 0.10.3 though. For
> now, you would need to build a custom Processor and implement the join
> by yourself.
>
> There is another JIRA for foreign-key join feature (unrelated to
> GlobalKTable): https://issues.apache.org/jira/browse/KAFKA-3705
>
> Maybe the discussion helps you do implement you own join.
>
>
> -Matthias
>
> On 1/30/17 11:05 AM, Jon Yeargers wrote:
> > I want to do a one:many join between two streams. There should be ~ 1:100
> > with < 1% having no match.
> >
> > My topology is relatively simple:
> >
> > KTable1.join(KTable2)->to("other topic")
> >\
> > \---> toStream().print()
> >
> > In the join it takes both Value1 and Value2 as JSON, converts them back
> to
> > Java Objects and combines them. This is returned as the JSON
> representation
> > of a new Object.
> >
> > If either value was NULL or unable to convert back to its source Object
> an
> > exception would be thrown.
> >
> > The output sent to the debugger looks like this for many thousands of
> rows
> >
> > [KTABLE-TOSTREAM-09]: 20bebc12136be4226b29c5d1b6183d8ed2b117c5 ,
> > null
> > [KTABLE-TOSTREAM-09]: c6f038b5182b8a2409a52be71f171d54e3b4 ,
> > null
> > [KTABLE-TOSTREAM-09]: f4b0aa0516c37c2725ce409cc5766df9a942950f ,
> > null
> > [KTABLE-TOSTREAM-09]: e7d8912ac1b660d21d1dd94955386fb9561abbab ,
> > null
> >
> > Then I will get many more that are matched.
> >
> > Questions:
> >
> > 1. Im assuming the ",null" indicates no match was found. This is a
> problem.
> > The source of the data is well understood and is < 1% unmatched. If
> either
> > object is null it throws an exception - which is doesn't.
> > 2. Is this the appropriate way to do a one:many join?
> >
>
>


Understanding output of KTable->KTable join

2017-01-30 Thread Jon Yeargers
I want to do a one:many join between two streams. There should be ~ 1:100
with < 1% having no match.

My topology is relatively simple:

KTable1.join(KTable2)->to("other topic")
   \
\---> toStream().print()

In the join it takes both Value1 and Value2 as JSON, converts them back to
Java Objects and combines them. This is returned as the JSON representation
of a new Object.

If either value was NULL or unable to convert back to its source Object an
exception would be thrown.

The output sent to the debugger looks like this for many thousands of rows

[KTABLE-TOSTREAM-09]: 20bebc12136be4226b29c5d1b6183d8ed2b117c5 ,
null
[KTABLE-TOSTREAM-09]: c6f038b5182b8a2409a52be71f171d54e3b4 ,
null
[KTABLE-TOSTREAM-09]: f4b0aa0516c37c2725ce409cc5766df9a942950f ,
null
[KTABLE-TOSTREAM-09]: e7d8912ac1b660d21d1dd94955386fb9561abbab ,
null

Then I will get many more that are matched.

Questions:

1. Im assuming the ",null" indicates no match was found. This is a problem.
The source of the data is well understood and is < 1% unmatched. If either
object is null it throws an exception - which is doesn't.
2. Is this the appropriate way to do a one:many join?


Re: Strategy for true random producer keying

2017-01-24 Thread Jon Yeargers
(cont'd) meant to say mod%partition count of System.currentTimeMillis().

Having said that - is there any disadvantage to true random distribution of
traffic for a topic?

On Tue, Jan 24, 2017 at 11:17 AM, Jon Yeargers 
wrote:

> It may be picking a random partition but it sticks with it indefinitely
> despite there being a significant disparity in traffic. I need to break it
> up in some different fashion. Maybe just a hash of
> System.currentTimeMillis()?
>
>
>
> On Tue, Jan 24, 2017 at 10:52 AM, Avi Flax 
> wrote:
>
>>
>> > On Jan 24, 2017, at 11:18, Jon Yeargers 
>> wrote:
>> >
>> > If I don't specify a key when I call send a value to kafka (something
>> akin
>> > to 'kafkaProducer.send(new ProducerRecord<>(TOPIC_PRODUCE,
>> jsonView))') how
>> > is it keyed?
>>
>> IIRC, in this case the key is null; i.e. there is no key.
>>
>> > I am producing to a topic from an external feed. It appears to be
>> heavily
>> > biased towards certain values and as a result I have 2-3 partitions that
>> > are lagging heavily where the rest are staying current.
>>
>> Hmm, according to the docs this shouldn’t matter:
>>
>> > If the key is null, then a random broker partition is picked.
>>
>> https://kafka.apache.org/documentation/#impl_producer
>>
>> You might want to double-check your code and confirm that it is indeed
>> sending no keys… i.e. maybe it’s actually using an empty string as a key,
>> or something like that.
>>
>> > Since I don't use
>> > the keys in my consumers Im wondering if I could randomize these values
>> > somehow to better distribute the load.
>>
>> As per the above docs, this _should_ already be the case, based on what
>> you’ve described.
>>
>> That said, if you continue to have trouble, then you can introduce your
>> own implementation of kafka.producer.Partitioner, and again as per the docs:
>>
>> > A custom partitioning strategy can also be plugged in using the
>> partitioner.class config parameter.
>>
>> Also, it so happens that I have implemented a custom random partitioning
>> strategy through an alternate approach by using the overloaded
>> ProducerRecord constructor that accepts a partition ID. You can easily get
>> the set of partition IDs from the Producer with the partitionsFor method.
>>
>> HTH!
>> Avi
>>
>> 
>> Software Architect @ Park Assist » http://tech.parkassist.com/
>
>
>


Re: Strategy for true random producer keying

2017-01-24 Thread Jon Yeargers
It may be picking a random partition but it sticks with it indefinitely
despite there being a significant disparity in traffic. I need to break it
up in some different fashion. Maybe just a hash of
System.currentTimeMillis()?



On Tue, Jan 24, 2017 at 10:52 AM, Avi Flax  wrote:

>
> > On Jan 24, 2017, at 11:18, Jon Yeargers 
> wrote:
> >
> > If I don't specify a key when I call send a value to kafka (something
> akin
> > to 'kafkaProducer.send(new ProducerRecord<>(TOPIC_PRODUCE, jsonView))')
> how
> > is it keyed?
>
> IIRC, in this case the key is null; i.e. there is no key.
>
> > I am producing to a topic from an external feed. It appears to be heavily
> > biased towards certain values and as a result I have 2-3 partitions that
> > are lagging heavily where the rest are staying current.
>
> Hmm, according to the docs this shouldn’t matter:
>
> > If the key is null, then a random broker partition is picked.
>
> https://kafka.apache.org/documentation/#impl_producer
>
> You might want to double-check your code and confirm that it is indeed
> sending no keys… i.e. maybe it’s actually using an empty string as a key,
> or something like that.
>
> > Since I don't use
> > the keys in my consumers Im wondering if I could randomize these values
> > somehow to better distribute the load.
>
> As per the above docs, this _should_ already be the case, based on what
> you’ve described.
>
> That said, if you continue to have trouble, then you can introduce your
> own implementation of kafka.producer.Partitioner, and again as per the docs:
>
> > A custom partitioning strategy can also be plugged in using the
> partitioner.class config parameter.
>
> Also, it so happens that I have implemented a custom random partitioning
> strategy through an alternate approach by using the overloaded
> ProducerRecord constructor that accepts a partition ID. You can easily get
> the set of partition IDs from the Producer with the partitionsFor method.
>
> HTH!
> Avi
>
> 
> Software Architect @ Park Assist » http://tech.parkassist.com/


Strategy for true random producer keying

2017-01-24 Thread Jon Yeargers
If I don't specify a key when I call send a value to kafka (something akin
to 'kafkaProducer.send(new ProducerRecord<>(TOPIC_PRODUCE, jsonView))') how
is it keyed?

I am producing to a topic from an external feed. It appears to be heavily
biased towards certain values and as a result I have 2-3 partitions that
are lagging heavily where the rest are staying current. Since I don't use
the keys in my consumers Im wondering if I could randomize these values
somehow to better distribute the load.


Re: Messages are lost

2017-01-24 Thread Jon Yeargers
Make sure you don't have an orphaned process holding onto the various
kafka/zk folders. If it won't respond and you can't kill it then this might
have happened.

On Tue, Jan 24, 2017 at 6:46 AM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> Can anyone please answer this?
>
> Thanks
> Achintya
>
> -Original Message-
> From: Ghosh, Achintya (Contractor) [mailto:achintya_gh...@comcast.com]
> Sent: Monday, January 23, 2017 1:51 PM
> To: users@kafka.apache.org
> Subject: RE: Messages are lost
>
> Version 0.10 and I don’t have the thread dump but have the KafkaServer log
> where the error is there.
>
> Thanks
> Achintya
>
> -Original Message-
> From: Apurva Mehta [mailto:apu...@confluent.io]
> Sent: Monday, January 23, 2017 12:49 PM
> To: users@kafka.apache.org
> Subject: Re: Messages are lost
>
> What version of kafka have you deployed? Can you post a thread dump of the
> hung broker?
>
> On Fri, Jan 20, 2017 at 12:14 PM, Ghosh, Achintya (Contractor) <
> achintya_gh...@comcast.com> wrote:
>
> > Hi there,
> >
> > I see the below exception in one of my node's log( cluster with 3
> > nodes) and then the node is stopped to responding(it's hung state , I
> > mean if I do
> > ps-ef|grep kafka , I see the Kafka process but it is not responding)
> > ps-ef|and we
> > lost around 100 messages:
> >
> >
> > 1.   What could be the reason for this exception ? My broker ID is
> > unique so what is the solution for this issue?
> >
> > [2017-01-19 15:56:23,644] ERROR Error handling event ZkEvent[New
> > session event sent to
> > kafka.server.KafkaHealthcheck$SessionExpireListener@2d74e7af]
> > (org.I0Itec.zkclient.ZkEventThread)
> > java.lang.RuntimeException: A broker is already registered on the path
> > /brokers/ids/2. This probably indicates that you either have
> > configured a brokerid that is already in use, or else you have
> > shutdown this broker and restarted it faster than the zookeeper
> > timeout so it appears to be re-registering.
> > at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.
> > scala:305)
> > at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.
> > scala:291)
> > at kafka.server.KafkaHealthcheck.
> > register(KafkaHealthcheck.scala:70)
> > at kafka.server.KafkaHealthcheck$SessionExpireListener.
> > handleNewSession(KafkaHealthcheck.scala:104)
> > at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> > at org.I0Itec.zkclient.ZkEventThread.run(
> > ZkEventThread.java:71)
> >
> >
> >
> > 2.   As we lost 100 messages for each topic and I don't see any
> > exception in our application log, so how we can track the exception
> > and will make sure the we'll not loose any data(consumer end).
> >
> > Thanks
> > Achintya
> >
>


Re: Consumer not associating? Perpetual rebalance

2017-01-10 Thread Jon Yeargers
FWIW - (for some distant observer):

I think my topic / consumer was too slow for the default commit interval. I
added these lines to the above config and it seems to be working ok:

// These are likely the default but Im adding them ... anyway...
   consumerProperties.put("enable.auto.commit", "true");
   consumerProperties.put("auto.commit.interval.ms", "1000");

// this is the critical view (I think)
   consumerProperties.put("max.poll.records", "10");


On Tue, Jan 10, 2017 at 8:55 AM, Jon Yeargers 
wrote:

> Single app with single consumer. Pulling ~30 records / min.
>
> When I enter 'kafka-topics ... --new-consumer --group 
> --describe' it always tells me "Consumer group  is rebalancing".
>
> If I enter "kafka-consumer-offset-checker ...--topic  --group
> "it responds with appropriate consumer position(s) but tells
> me "owner" is "none".
>
> I know my app is consuming records and if I stop/start it it picks up
> where it left off.
>
> Why is it marked as 'rebalancing'?
>
>
>
> Consumer setup:
>
>
> props.put("bootstrap.servers", Main.KAFKA_IP);
> props.put("group.id", groupId);
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.
> getName());
> props.put("session.timeout.ms", "6");
> props.put("request.timeout.ms", "60001");
>
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>
> Using 0.10.1.0 brokers
> 0.10.2.0-SNAPSHOT (current trunk) as client library
>


Consumer not associating? Perpetual rebalance

2017-01-10 Thread Jon Yeargers
Single app with single consumer. Pulling ~30 records / min.

When I enter 'kafka-topics ... --new-consumer --group 
--describe' it always tells me "Consumer group  is rebalancing".

If I enter "kafka-consumer-offset-checker ...--topic  --group "it responds with appropriate consumer position(s) but tells me
"owner" is "none".

I know my app is consuming records and if I stop/start it it picks up where
it left off.

Why is it marked as 'rebalancing'?



Consumer setup:


props.put("bootstrap.servers", Main.KAFKA_IP);
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("session.timeout.ms", "6");
props.put("request.timeout.ms", "60001");

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Using 0.10.1.0 brokers
0.10.2.0-SNAPSHOT (current trunk) as client library


Re: What to do when it won't rebalance "properly"

2017-01-07 Thread Jon Yeargers
Along these same lines - I have a topic with a single consumer. When I try
to look at lag ("kafka-topics .. --group ... --describe") I get the message
"Consumer group  is rebalancing"

This continues in perpetuity despite stop/(re)start the consumer and cycle
all (3) brokers.

On Sat, Jan 7, 2017 at 7:48 AM, Jon Yeargers 
wrote:

> Have been messing about with Kubernetes on google-cloud. Launched a pod
> with 6 consumer nodes and watched the lag using 'kafka-topics ..
> --new-consumer --describe'. Topic has assigned all (12 in this case) nodes
> to the same consumer while the other 5 are sitting idle.
>
> This has been the case for ~20 minutes now. No apparent change forthcoming.
>
> I even tried killing the one consumer to see if it would trigger a
> "proper" rebalance. Nope. It went back and assigned everyone to another
> (single) consumer.
>
> Using 0.10.2.0-SNAPSHOT - latest from trunk
>
> (also tried with 0.10.1.0)
>
>


What to do when it won't rebalance "properly"

2017-01-07 Thread Jon Yeargers
Have been messing about with Kubernetes on google-cloud. Launched a pod
with 6 consumer nodes and watched the lag using 'kafka-topics ..
--new-consumer --describe'. Topic has assigned all (12 in this case) nodes
to the same consumer while the other 5 are sitting idle.

This has been the case for ~20 minutes now. No apparent change forthcoming.

I even tried killing the one consumer to see if it would trigger a "proper"
rebalance. Nope. It went back and assigned everyone to another (single)
consumer.

Using 0.10.2.0-SNAPSHOT - latest from trunk

(also tried with 0.10.1.0)


0.10.2.0-SNAPSHOT - rocksdb exception(s)

2017-01-01 Thread Jon Yeargers
2017-01-01 18:19:13,206 [StreamThread-1] ERROR
o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
RtDetailBreako

utProcessor failed on partition assignment

org.apache.kafka.streams.errors.ProcessorStateException: Error opening
store table_stream-201701011700 at location
/mnt/RtDetailBreakoutProcessor/RtDetailBreakoutProcessor/0_1/table_stream/

table_stream-201701011700

at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:187)

at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:388)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:319)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$000(RocksDBWindowStore.java:51)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$1.restore(RocksDBWindowStore.java:206)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:238)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)

Caused by: org.rocksdb.RocksDBException: IO error: lock
/mnt/RtDetailBreakoutProcessor/RtDetailBreakoutProcessor/0_1/table_stream/table_stream-201701011700/LOCK:
No locks available

at org.rocksdb.RocksDB.open(Native Method)

at org.rocksdb.RocksDB.open(RocksDB.java:184)

at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:180)


0.10.2.0-SNAPSHOT - "log end offset should not change while restoring"

2017-01-01 Thread Jon Yeargers
java.lang.IllegalStateException: task [0_6] Log end offset of
RtDetailBreakoutProcessor-table_stream-changelog-6 should not change while
restoring: old end offset 26883455, current offset 2

6883467

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)


EOF exceptions - 0.10.2.0-SNAPSHOT

2017-01-01 Thread Jon Yeargers
(Ive been testing against the latest from github as 0.10.1.1 is too buggy)

Seeing quite a few of these this morning:

2017-01-01 16:56:53,299 [StreamThread-1] DEBUG
o.a.kafka.common.network.Selector - Connection with /
disconnected

java.io.EOFException: null

at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)

at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)

at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)

at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)

at org.apache.kafka.common.network.Selector.poll(Selector.java:303)

at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:341)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:235)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:181)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:309)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)


Re: Interesting error message du jour

2016-12-30 Thread Jon Yeargers
K - so no big deal.

TY

On Fri, Dec 30, 2016 at 12:13 PM, Ewen Cheslack-Postava 
wrote:

> Jon,
>
> This looks the same as https://issues.apache.org/jira/browse/KAFKA-4563,
> although for a different invalid transition. The temporary fix suggested
> there is to simply convert the exception to log a warning, which should be
> a pretty trivial patch against trunk. It seems there are some transitions
> that haven't fully been thought through even if they may actually be valid,
> so patching this for now may be the easiest way to unblock your
> development.
>
> -Ewen
>
> On Fri, Dec 30, 2016 at 9:45 AM, Jon Yeargers 
> wrote:
>
> > Attaching the debug log
> >
> > On Fri, Dec 30, 2016 at 6:39 AM, Jon Yeargers 
> > wrote:
> >
> >> Using 0.10.2.0-snapshot:
> >>
> >> java.lang.IllegalStateException: Incorrect state transition from
> >> ASSIGNING_PARTITIONS to ASSIGNING_PARTITIONS
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.
> >> setState(StreamThread.java:163)
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.se
> >> tStateWhenNotInPendingShutdown(StreamThread.java:175)
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.
> >> access$200(StreamThread.java:71)
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread$1.
> >> onPartitionsAssigned(StreamThread.java:234)
> >>
> >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.onJoinComplete(ConsumerCoordinator.java:230)
> >>
> >> at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> >> tor.joinGroupIfNeeded(AbstractCoordinator.java:314)
> >>
> >> at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> >> tor.ensureActiveGroup(AbstractCoordinator.java:278)
> >>
> >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.poll(ConsumerCoordinator.java:261)
> >>
> >> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> >> KafkaConsumer.java:1039)
> >>
> >> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo
> >> nsumer.java:1004)
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.
> >> runLoop(StreamThread.java:569)
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.
> >> run(StreamThread.java:358)
> >>
> >
> >
>


Re: Interesting error message du jour

2016-12-30 Thread Jon Yeargers
Attaching the debug log

On Fri, Dec 30, 2016 at 6:39 AM, Jon Yeargers 
wrote:

> Using 0.10.2.0-snapshot:
>
> java.lang.IllegalStateException: Incorrect state transition from
> ASSIGNING_PARTITIONS to ASSIGNING_PARTITIONS
>
> at org.apache.kafka.streams.processor.internals.
> StreamThread.setState(StreamThread.java:163)
>
> at org.apache.kafka.streams.processor.internals.StreamThread.
> setStateWhenNotInPendingShutdown(StreamThread.java:175)
>
> at org.apache.kafka.streams.processor.internals.
> StreamThread.access$200(StreamThread.java:71)
>
> at org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:234)
>
> at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>
> at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>
> at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>
> at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1039)
>
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:1004)
>
> at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(StreamThread.java:569)
>
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:358)
>


errors.log.gz
Description: GNU Zip compressed data


Interesting error message du jour

2016-12-30 Thread Jon Yeargers
Using 0.10.2.0-snapshot:

java.lang.IllegalStateException: Incorrect state transition from
ASSIGNING_PARTITIONS to ASSIGNING_PARTITIONS

at
org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:163)

at
org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:175)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$200(StreamThread.java:71)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:569)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:358)


Re: Memory / resource leak in 0.10.1.1 release

2016-12-30 Thread Jon Yeargers
FWIW: I went through and removed all the 'custom' serdes from my code and
replaced them with 'string serdes'. The memory leak problem went away.

The code is a bit more cumbersome now as it's constantly flipping back and
forth between Objects and JSON.. but that seems to be what it takes to keep
it running.

On Thu, Dec 29, 2016 at 9:42 PM, Guozhang Wang  wrote:

> Hello Jon,
>
> It is hard to tell, since I cannot see how is your Aggregate() function is
> implemented as well.
>
> Note that the deserializer of transactionSerde is used in both `aggregate`
> and `KstreamBuilder.stream`, while the serializer of transactionSerde is
> only used in `aggregate`, so if you suspect the transactionSerde is the
> root cause, to narrow it down you can leave the topology as
>
>
> KStream transactionKStream =  kStreamBuilder.stream(
> stringSerde,transactionSerde,TOPIC);
>
> transactionKStream.to(TOPIC-2);
>
> where TOPIC-2 should be pre-created.
>
> The above topology will also trigger both the serializer and deserializer
> of the transactionSerde, and if this topology also leads to memory leak,
> then it means it is not relevant to your aggregate function.
>
>
> Guozhang
>
>
> On Sun, Dec 25, 2016 at 4:15 AM, Jon Yeargers 
> wrote:
>
> > I narrowed this problem down to this part of the topology (and yes, it's
> > 100% repro - for me):
> >
> > KStream transactionKStream =
> >  kStreamBuilder.stream(stringSerde,transactionSerde,TOPIC);
> >
> > KTable, SumRecordCollector> ktAgg =
> > transactionKStream.groupByKey().aggregate(
> > SumRecordCollector::new,
> > new Aggregate(),
> > TimeWindows.of(20 * 60 * 1000L),
> > collectorSerde, "table_stream");
> >
> > Given that this is a pretty trivial, well-traveled piece of Kafka I can't
> > imagine it has a memory leak.
> >
> > So Im guessing that the serde I'm using is causing a problem somehow. The
> > 'transactionSerde' is just to get/set JSON into the 'SumRecord' object.
> > That Object is just a bunch of String and int fields so nothing
> interesting
> > there either.
> >
> > I'm attaching the two parts of the transactionSerde to see if anyone has
> > suggestions on how to find / fix this.
> >
> >
> >
> > On Thu, Dec 22, 2016 at 9:26 AM, Jon Yeargers 
> > wrote:
> >
> >> Yes - that's the one. It's 100% reproducible (for me).
> >>
> >>
> >> On Thu, Dec 22, 2016 at 8:03 AM, Damian Guy 
> wrote:
> >>
> >>> Hi Jon,
> >>>
> >>> Is this for the topology where you are doing something like:
> >>>
> >>> topology: kStream -> groupByKey.aggregate(minute) -> foreach
> >>>  \-> groupByKey.aggregate(hour) -> foreach
> >>>
> >>> I'm trying to understand how i could reproduce your problem. I've not
> >>> seen
> >>> any such issues with 0.10.1.1, but then i'm not sure what you are
> doing.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Thu, 22 Dec 2016 at 15:26 Jon Yeargers 
> >>> wrote:
> >>>
> >>> > Im still hitting this leak with the released version of 0.10.1.1.
> >>> >
> >>> > Process mem % grows over the course of 10-20 minutes and eventually
> >>> the OS
> >>> > kills it.
> >>> >
> >>> > Messages like this appear in /var/log/messages:
> >>> >
> >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.793692] java
> invoked
> >>> > oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0
> >>> >
> >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.798383] java
> >>> cpuset=/
> >>> > mems_allowed=0
> >>> >
> >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.801079] CPU: 0
> PID:
> >>> 9550
> >>> > Comm: java Tainted: GE   4.4.19-29.55.amzn1.x86_64 #1
> >>> >
> >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Hardware
> >>> name:
> >>> > Xen HVM domU, BIOS 4.2.amazon 11/11/2016
> >>> >
> >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> >>> >  88071c517a70 812c958f 88071c517c58
> >>> >
> >>> > Dec 22 1

Re: Memory / resource leak in 0.10.1.1 release

2016-12-25 Thread Jon Yeargers
I narrowed this problem down to this part of the topology (and yes, it's
100% repro - for me):

KStream transactionKStream =
 kStreamBuilder.stream(stringSerde,transactionSerde,TOPIC);

KTable, SumRecordCollector> ktAgg =
transactionKStream.groupByKey().aggregate(
SumRecordCollector::new,
new Aggregate(),
TimeWindows.of(20 * 60 * 1000L),
collectorSerde, "table_stream");

Given that this is a pretty trivial, well-traveled piece of Kafka I can't
imagine it has a memory leak.

So Im guessing that the serde I'm using is causing a problem somehow. The
'transactionSerde' is just to get/set JSON into the 'SumRecord' object.
That Object is just a bunch of String and int fields so nothing interesting
there either.

I'm attaching the two parts of the transactionSerde to see if anyone has
suggestions on how to find / fix this.



On Thu, Dec 22, 2016 at 9:26 AM, Jon Yeargers 
wrote:

> Yes - that's the one. It's 100% reproducible (for me).
>
>
> On Thu, Dec 22, 2016 at 8:03 AM, Damian Guy  wrote:
>
>> Hi Jon,
>>
>> Is this for the topology where you are doing something like:
>>
>> topology: kStream -> groupByKey.aggregate(minute) -> foreach
>>  \-> groupByKey.aggregate(hour) -> foreach
>>
>> I'm trying to understand how i could reproduce your problem. I've not seen
>> any such issues with 0.10.1.1, but then i'm not sure what you are doing.
>>
>> Thanks,
>> Damian
>>
>> On Thu, 22 Dec 2016 at 15:26 Jon Yeargers 
>> wrote:
>>
>> > Im still hitting this leak with the released version of 0.10.1.1.
>> >
>> > Process mem % grows over the course of 10-20 minutes and eventually the
>> OS
>> > kills it.
>> >
>> > Messages like this appear in /var/log/messages:
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.793692] java invoked
>> > oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.798383] java cpuset=/
>> > mems_allowed=0
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.801079] CPU: 0 PID:
>> 9550
>> > Comm: java Tainted: GE   4.4.19-29.55.amzn1.x86_64 #1
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Hardware
>> name:
>> > Xen HVM domU, BIOS 4.2.amazon 11/11/2016
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> >  88071c517a70 812c958f 88071c517c58
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> >  88071c517b00 811ce76d 8109db14
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > 810b2d91  0010 817d0fe9
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Call Trace:
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] dump_stack+0x63/0x84
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] dump_header+0x5e/0x1d8
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] ? set_next_entity+0xa4/0x710
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] ? __raw_callee_save___pv_queued_
>> spin_unlock+0x11/0x20
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] oom_kill_process+0x205/0x3d0
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] out_of_memory+0x431/0x480
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] __alloc_pages_nodemask+0x91e/0xa60
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] alloc_pages_current+0x88/0x120
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] __page_cache_alloc+0xb4/0xc0
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] filemap_fault+0x188/0x3e0
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] ext4_filemap_fault+0x36/0x50 [ext4]
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] __do_fault+0x3d/0x70
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] handle_mm_fault+0xf27/0x1870
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] ? __raw_callee_save___pv_queued_
>> spin_unlock+0x11/0x20
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] __do_page_fault+0x183/0x3f0
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] do_page_fault+0x22/0x30
>> >
>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
>> > [] page_fault+0x28/0x30
>> >
>>
>
>


Re: Memory / resource leak in 0.10.1.1 release

2016-12-22 Thread Jon Yeargers
Yes - that's the one. It's 100% reproducible (for me).


On Thu, Dec 22, 2016 at 8:03 AM, Damian Guy  wrote:

> Hi Jon,
>
> Is this for the topology where you are doing something like:
>
> topology: kStream -> groupByKey.aggregate(minute) -> foreach
>  \-> groupByKey.aggregate(hour) -> foreach
>
> I'm trying to understand how i could reproduce your problem. I've not seen
> any such issues with 0.10.1.1, but then i'm not sure what you are doing.
>
> Thanks,
> Damian
>
> On Thu, 22 Dec 2016 at 15:26 Jon Yeargers 
> wrote:
>
> > Im still hitting this leak with the released version of 0.10.1.1.
> >
> > Process mem % grows over the course of 10-20 minutes and eventually the
> OS
> > kills it.
> >
> > Messages like this appear in /var/log/messages:
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.793692] java invoked
> > oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.798383] java cpuset=/
> > mems_allowed=0
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.801079] CPU: 0 PID:
> 9550
> > Comm: java Tainted: GE   4.4.19-29.55.amzn1.x86_64 #1
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Hardware name:
> > Xen HVM domU, BIOS 4.2.amazon 11/11/2016
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> >  88071c517a70 812c958f 88071c517c58
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> >  88071c517b00 811ce76d 8109db14
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > 810b2d91  0010 817d0fe9
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Call Trace:
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] dump_stack+0x63/0x84
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] dump_header+0x5e/0x1d8
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] ? set_next_entity+0xa4/0x710
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] ? __raw_callee_save___pv_queued_
> spin_unlock+0x11/0x20
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] oom_kill_process+0x205/0x3d0
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] out_of_memory+0x431/0x480
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] __alloc_pages_nodemask+0x91e/0xa60
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] alloc_pages_current+0x88/0x120
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] __page_cache_alloc+0xb4/0xc0
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] filemap_fault+0x188/0x3e0
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] ext4_filemap_fault+0x36/0x50 [ext4]
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] __do_fault+0x3d/0x70
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] handle_mm_fault+0xf27/0x1870
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] ? __raw_callee_save___pv_queued_
> spin_unlock+0x11/0x20
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] __do_page_fault+0x183/0x3f0
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] do_page_fault+0x22/0x30
> >
> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
> > [] page_fault+0x28/0x30
> >
>


Memory / resource leak in 0.10.1.1 release

2016-12-22 Thread Jon Yeargers
Im still hitting this leak with the released version of 0.10.1.1.

Process mem % grows over the course of 10-20 minutes and eventually the OS
kills it.

Messages like this appear in /var/log/messages:

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.793692] java invoked
oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.798383] java cpuset=/
mems_allowed=0

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.801079] CPU: 0 PID: 9550
Comm: java Tainted: GE   4.4.19-29.55.amzn1.x86_64 #1

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Hardware name:
Xen HVM domU, BIOS 4.2.amazon 11/11/2016

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
 88071c517a70 812c958f 88071c517c58

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
 88071c517b00 811ce76d 8109db14

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
810b2d91  0010 817d0fe9

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Call Trace:

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] dump_stack+0x63/0x84

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] dump_header+0x5e/0x1d8

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] ? set_next_entity+0xa4/0x710

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] ? __raw_callee_save___pv_queued_spin_unlock+0x11/0x20

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] oom_kill_process+0x205/0x3d0

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] out_of_memory+0x431/0x480

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] __alloc_pages_nodemask+0x91e/0xa60

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] alloc_pages_current+0x88/0x120

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] __page_cache_alloc+0xb4/0xc0

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] filemap_fault+0x188/0x3e0

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] ext4_filemap_fault+0x36/0x50 [ext4]

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] __do_fault+0x3d/0x70

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] handle_mm_fault+0xf27/0x1870

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] ? __raw_callee_save___pv_queued_spin_unlock+0x11/0x20

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] __do_page_fault+0x183/0x3f0

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] do_page_fault+0x22/0x30

Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072]
[] page_fault+0x28/0x30


Re: spontaneous / unwanted exits from KStream apps

2016-12-21 Thread Jon Yeargers
Did I say memory usage was stable? Lies.

After ~15min it's up to 50% - and climbing.

20 min: 63%

Eventually the OS killed it. Didn't generate a log this time though. Found
this snip in /var/log/messages:

Dec 21 12:56:19 ip-172-16-101-108 kernel: [2901342.207241] java invoked
oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0


Some stack dump bits as well. This bug (or one v much like it) was fixed in
the 0.10.2.0-SNAPSHOT I was previously using. The unhappy build comes from
0.10.1.1-RC1 and trunk.



On Wed, Dec 21, 2016 at 4:32 AM, Jon Yeargers 
wrote:

> Found this treasure lurking in my app folder:
>
> Shows that the process was OOM-killed by the OS. Have restarted to see if
> it will reproduce - so far memory usage seems stable.
>
>
>
> On Wed, Dec 21, 2016 at 3:05 AM, Jon Yeargers 
> wrote:
>
>> I upgraded my app(s) to 0.10.1.1-rc1 but I'm still seeing these
>> error-free shutdowns. Something is taking my app down after varying lengths
>> of time (10 minutes to several hours). Doesn't matter if Im running one or
>> many instances.
>>
>> Suggestions on where to look? I've sent several debug logs.
>>
>>
>>
>


Re: spontaneous / unwanted exits from KStream apps

2016-12-21 Thread Jon Yeargers
Found this treasure lurking in my app folder:

Shows that the process was OOM-killed by the OS. Have restarted to see if
it will reproduce - so far memory usage seems stable.



On Wed, Dec 21, 2016 at 3:05 AM, Jon Yeargers 
wrote:

> I upgraded my app(s) to 0.10.1.1-rc1 but I'm still seeing these error-free
> shutdowns. Something is taking my app down after varying lengths of time
> (10 minutes to several hours). Doesn't matter if Im running one or many
> instances.
>
> Suggestions on where to look? I've sent several debug logs.
>
>
>


spontaneous / unwanted exits from KStream apps

2016-12-21 Thread Jon Yeargers
I upgraded my app(s) to 0.10.1.1-rc1 but I'm still seeing these error-free
shutdowns. Something is taking my app down after varying lengths of time
(10 minutes to several hours). Doesn't matter if Im running one or many
instances.

Suggestions on where to look? I've sent several debug logs.


Re: effect of high IOWait on KStream app?

2016-12-18 Thread Jon Yeargers
Could this delay be contributing to the various issues Im seeing?

- extended / repeating rebalances
- broker connection drops


On Sat, Dec 17, 2016 at 10:27 AM, Eno Thereska 
wrote:

> Yeah, the numbers for the streams tests seem to be low. For reference,
> here is what I get when I run it on my laptop, with Kafka co-located
> (Macbook pro, 16GB, SSD). These are rounded up with no decimal places:
>
> > Producer Performance [MB/sec write]: 40
> > Consumer Performance [MB/sec read]: 126
>
> > Streams Performance [MB/sec read]: 81
> > Streams Performance [MB/sec read+write]: 45
> > Streams Performance [MB/sec read+store]: 22
> > Streams KStreamKTable LeftJoin Performance [MB/s joined]: 51
> > Streams KStreamKStream LeftJoin Performance [MB/s joined]: 11
> > Streams KTableKTable LeftJoin Performance [MB/s joined]: 12
>
> I haven't tried this on AWS unfortunately so I don't know what to expect
> there.
>
> Eno
>
>
> > On 17 Dec 2016, at 15:39, Jon Yeargers  wrote:
> >
> > stateDir=/tmp/kafka-streams-simple-benchmark
> >
> > numRecords=1000
> >
> > SLF4J: Class path contains multiple SLF4J bindings.
> >
> > SLF4J: Found binding in
> > [jar:file:/home/ec2-user/kafka/streams/build/dependant-
> libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> >
> > SLF4J: Found binding in
> > [jar:file:/home/ec2-user/kafka/tools/build/dependant-
> libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> >
> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > explanation.
> >
> > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> >
> > [2016-12-17 15:26:15,011] WARN Error while fetching metadata with
> > correlation id 1 : {simpleBenchmarkSourceTopic=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient:709)
> >
> > Producer Performance [MB/sec write]: 87.52977203130317
> >
> > Consumer Performance [MB/sec read]: 88.05408180729077
> >
> > Streams Performance [MB/sec read]: 23.306380376435413
> >
> > [2016-12-17 15:27:22,722] WARN Error while fetching metadata with
> > correlation id 1 : {simpleBenchmarkSinkTopic=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient:709)
> >
> > Streams Performance [MB/sec read+write]: 20.37099360560648
> >
> > Streams Performance [MB/sec read+store]: 11.918550778354337
> >
> > Initializing kStreamTopic joinSourceTopic1kStreamKTable
> >
> > [2016-12-17 15:29:39,597] WARN Error while fetching metadata with
> > correlation id 1 : {joinSourceTopic1kStreamKTable=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient:709)
> >
> > Initializing kTableTopic joinSourceTopic2kStreamKTable
> >
> > [2016-12-17 15:29:50,589] WARN Error while fetching metadata with
> > correlation id 1 : {joinSourceTopic2kStreamKTable=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient:709)
> >
> > Streams KStreamKTable LeftJoin Performance [MB/s joined]:
> 14.690136622553428
> >
> > Initializing kStreamTopic joinSourceTopic1kStreamKStream
> >
> > [2016-12-17 15:31:12,583] WARN Error while fetching metadata with
> > correlation id 1 : {joinSourceTopic1kStreamKStream=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient:709)
> >
> > Initializing kStreamTopic joinSourceTopic2kStreamKStream
> >
> > [2016-12-17 15:31:23,534] WARN Error while fetching metadata with
> > correlation id 1 : {joinSourceTopic2kStreamKStream=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient:709)
> >
> > Streams KStreamKStream LeftJoin Performance [MB/s joined]:
> 8.647640177490924
> >
> > Initializing kTableTopic joinSourceTopic1kTableKTable
> >
> > [2016-12-17 15:33:34,586] WARN Error while fetching metadata with
> > correlation id 1 : {joinSourceTopic1kTableKTable=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient:709)
> >
> > Initializing kTableTopic joinSourceTopic2kTableKTable
> >
> > [2016-12-17 15:33:45,520] WARN Error while fetching metadata with
> > correlation id 1 : {joinSourceTopic2kTableKTable=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient:709)
> >
> > Streams KTableKTable LeftJoin Performance [MB/s joined]:
> 6.530348031376133
> >
> > On Sat, Dec 17, 2016 at 6:39 AM, Jon Yeargers 
> > wrote:
> >
> >> I'd be happy to but the AWS AMI (default) i'm using is fighting this at
> >> every turn. Will keep t

Re: checking consumer lag on KStreams app?

2016-12-18 Thread Jon Yeargers
3 PM, Damian Guy 
> > >>> wrote:
> > >>>>
> > >>>>> Hi Sachin,
> > >>>>>
> > >>>>> You should use the kafka-consumer-groups.sh command. The
> > >>>>> ConsumerOffsetChecker is deprecated and is only for the old
> consumer.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Damian
> > >>>>>
> > >>>>> On Mon, 12 Dec 2016 at 14:32 Sachin Mittal 
> > >> wrote:
> > >>>>>
> > >>>>>> Hi,
> > >>>>>> I have a streams application running with application id test.
> > >>>>>> When I try to check consumer lag like you suggested I get the
> > >>> following
> > >>>>>> issue:
> > >>>>>>
> > >>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> > >> --zookeeper
> > >>>>>> localhost:2181 --group test
> > >>>>>> [2016-12-12 10:26:01,348] WARN WARNING: ConsumerOffsetChecker is
> > >>>>>> deprecated and will be dropped in releases following 0.9.0. Use
> > >>>>>> ConsumerGroupCommand instead. (kafka.tools.
> ConsumerOffsetChecker$)
> > >>>>>> SLF4J: Class path contains multiple SLF4J bindings.
> > >>>>>> SLF4J: Found binding in
> > >>>>>>
> > >>>>>> [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > >>>>> libs/logback-classic-1.0.3.jar!/org/slf4j/impl/
> > >>> StaticLoggerBinder.class]
> > >>>>>> SLF4J: Found binding in
> > >>>>>>
> > >>>>>> [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > >>>>> libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/
> > >>> StaticLoggerBinder.class]
> > >>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for
> > >> an
> > >>>>>> explanation.
> > >>>>>> SLF4J: Actual binding is of type
> > >>>>>> [ch.qos.logback.classic.selector.DefaultContextSelector]
> > >>>>>> Exiting due to: org.apache.zookeeper.KeeperException$
> > >>> NoNodeException:
> > >>>>>> KeeperErrorCode = NoNode for /consumers/test/owners.
> > >>>>>>
> > >>>>>> Please let me know where I may be going wrong.
> > >>>>>> I have the kafka logs set in folder
> > >>>>>> /data01/testuser/kafka-logs
> > >>>>>>
> > >>>>>> Under kafka-logs I see many folders with name something like
> > >>>>>> consumer_offsets_*
> > >>>>>>
> > >>>>>> I have the stream dir set in folder
> > >>>>>> /data01/testuser/kafka-streams/test
> > >>>>>>
> > >>>>>> Thanks
> > >>>>>> Sachin
> > >>>>>>
> > >>>>>>
> > >>>>>> On Sun, Dec 11, 2016 at 2:19 AM, Matthias J. Sax <
> > >>>> matth...@confluent.io>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> It's basically just a consumer as any other. The application.id
> > >> is
> > >>>>> used
> > >>>>>>> as consumer group.id.
> > >>>>>>>
> > >>>>>>> So just use the available tools you do use to check consumer lag.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> -Matthias
> > >>>>>>>
> > >>>>>>> On 12/9/16 5:49 PM, Jon Yeargers wrote:
> > >>>>>>>> How would this be done?
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>


Re: effect of high IOWait on KStream app?

2016-12-17 Thread Jon Yeargers
stateDir=/tmp/kafka-streams-simple-benchmark

numRecords=1000

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in
[jar:file:/home/ec2-user/kafka/streams/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in
[jar:file:/home/ec2-user/kafka/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

[2016-12-17 15:26:15,011] WARN Error while fetching metadata with
correlation id 1 : {simpleBenchmarkSourceTopic=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:709)

Producer Performance [MB/sec write]: 87.52977203130317

Consumer Performance [MB/sec read]: 88.05408180729077

Streams Performance [MB/sec read]: 23.306380376435413

[2016-12-17 15:27:22,722] WARN Error while fetching metadata with
correlation id 1 : {simpleBenchmarkSinkTopic=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:709)

Streams Performance [MB/sec read+write]: 20.37099360560648

Streams Performance [MB/sec read+store]: 11.918550778354337

Initializing kStreamTopic joinSourceTopic1kStreamKTable

[2016-12-17 15:29:39,597] WARN Error while fetching metadata with
correlation id 1 : {joinSourceTopic1kStreamKTable=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:709)

Initializing kTableTopic joinSourceTopic2kStreamKTable

[2016-12-17 15:29:50,589] WARN Error while fetching metadata with
correlation id 1 : {joinSourceTopic2kStreamKTable=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:709)

Streams KStreamKTable LeftJoin Performance [MB/s joined]: 14.690136622553428

Initializing kStreamTopic joinSourceTopic1kStreamKStream

[2016-12-17 15:31:12,583] WARN Error while fetching metadata with
correlation id 1 : {joinSourceTopic1kStreamKStream=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:709)

Initializing kStreamTopic joinSourceTopic2kStreamKStream

[2016-12-17 15:31:23,534] WARN Error while fetching metadata with
correlation id 1 : {joinSourceTopic2kStreamKStream=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:709)

Streams KStreamKStream LeftJoin Performance [MB/s joined]: 8.647640177490924

Initializing kTableTopic joinSourceTopic1kTableKTable

[2016-12-17 15:33:34,586] WARN Error while fetching metadata with
correlation id 1 : {joinSourceTopic1kTableKTable=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:709)

Initializing kTableTopic joinSourceTopic2kTableKTable

[2016-12-17 15:33:45,520] WARN Error while fetching metadata with
correlation id 1 : {joinSourceTopic2kTableKTable=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:709)

Streams KTableKTable LeftJoin Performance [MB/s joined]: 6.530348031376133

On Sat, Dec 17, 2016 at 6:39 AM, Jon Yeargers 
wrote:

> I'd be happy to but the AWS AMI (default) i'm using is fighting this at
> every turn. Will keep trying.
>
> On Sat, Dec 17, 2016 at 2:46 AM, Eno Thereska 
> wrote:
>
>> Jon,
>>
>> It's hard to tell. Would you be willing to run a simple benchmark and
>> report back the numbers? The benchmark is called SimpleBenchmark.java, it's
>> included with the source, and it will start a couple of streams apps. It
>> requires a ZK and a broker to be up. Then you run it:
>> org.apache.kafka.streams.perf.SimpleBenchmark 
>> .
>>
>> Thanks
>> Eno
>> > On 16 Dec 2016, at 20:00, Jon Yeargers 
>> wrote:
>> >
>> > Looking for reasons why my installations seem to be generating so many
>> > issues:
>> >
>> > Starting an app which is
>> >
>> > stream->aggregate->filter->foreach
>> >
>> > While it's running the system in question (AWS) averages >10% IOWait
>> with
>> > spikes to 60-70%.  The CPU load is in the range of 3/2/1 (8 core
>> machine w/
>> > 16G RAM)
>> >
>> > Could this IO delay be causing problems? The 'kafka-streams' folder is
>> on a
>> > separate EBS optimized volume with 2500 dedicated IOPs.
>>
>>
>


Re: effect of high IOWait on KStream app?

2016-12-17 Thread Jon Yeargers
I'd be happy to but the AWS AMI (default) i'm using is fighting this at
every turn. Will keep trying.

On Sat, Dec 17, 2016 at 2:46 AM, Eno Thereska 
wrote:

> Jon,
>
> It's hard to tell. Would you be willing to run a simple benchmark and
> report back the numbers? The benchmark is called SimpleBenchmark.java, it's
> included with the source, and it will start a couple of streams apps. It
> requires a ZK and a broker to be up. Then you run it:
> org.apache.kafka.streams.perf.SimpleBenchmark 
> .
>
> Thanks
> Eno
> > On 16 Dec 2016, at 20:00, Jon Yeargers  wrote:
> >
> > Looking for reasons why my installations seem to be generating so many
> > issues:
> >
> > Starting an app which is
> >
> > stream->aggregate->filter->foreach
> >
> > While it's running the system in question (AWS) averages >10% IOWait with
> > spikes to 60-70%.  The CPU load is in the range of 3/2/1 (8 core machine
> w/
> > 16G RAM)
> >
> > Could this IO delay be causing problems? The 'kafka-streams' folder is
> on a
> > separate EBS optimized volume with 2500 dedicated IOPs.
>
>


effect of high IOWait on KStream app?

2016-12-16 Thread Jon Yeargers
Looking for reasons why my installations seem to be generating so many
issues:

Starting an app which is

stream->aggregate->filter->foreach

While it's running the system in question (AWS) averages >10% IOWait with
spikes to 60-70%.  The CPU load is in the range of 3/2/1 (8 core machine w/
16G RAM)

Could this IO delay be causing problems? The 'kafka-streams' folder is on a
separate EBS optimized volume with 2500 dedicated IOPs.


Re: What makes a KStream app exit?

2016-12-16 Thread Jon Yeargers
And these bugs would cause the behaviors Im seeing?

On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax 
wrote:

> We just discovered a couple of bugs with regard to standby tasks... Not
> all bug fix PRs got merged yet.
>
> You can try running on trunk to get those fixes. Should only be a few
> days until the fixes get merged.
>
>
> -Matthias
>
> On 12/16/16 9:10 AM, Jon Yeargers wrote:
> > Have started having this issue with another KStream based app. Digging
> > through logs I ran across this message:
> >
> > When I've seen it before it certainly does kill the application. At the
> end
> > of the SNIP you can see the exit process starting.
> >
> >
> > 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> creating
> > new standby task 0_0
> >
> > 2016-12-16 17:04:51,507 [StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> Creating
> > new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]
> >
> > 2016-12-16 17:04:51,508 [StreamThread-1] INFO
> > o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state
> > stores
> >
> > 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
> > o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> fetching
> > committed offsets for partitions: [rtdetail_breakout-0]
> >
> > 2016-12-16 17:04:51,819 [StreamThread-1] ERROR
> > o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > RtDetailBreakoutProcessor fa
> >
> > iled on partition assignment
> >
> > java.lang.UnsupportedOperationException: null
> >
> > at
> > org.apache.kafka.streams.processor.internals.StandbyContextImpl.
> recordCollector(StandbyContextImpl.java:81)
> >
> > at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.(
> StoreChangeLogger.java:54)
> >
> > at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.(
> StoreChangeLogger.java:46)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:197)
> >
> > at
> > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> MeteredWindowStore.java:66)
> >
> > at
> > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> CachingWindowStore.java:64)
> >
> > at
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StandbyTask.(
> StandbyTask.java:68)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.
> createStandbyTask(StreamThread.java:733)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStandbyTasks(StreamThread.java:757)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$200(
> StreamThread.java:69)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:125)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:229)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:313)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:277)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:260)
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:442)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG
> > o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> fetching
> > committed offsets for partitions: [rtdetail_breakout-2,
> > rtdet

Re: What makes a KStream app exit?

2016-12-16 Thread Jon Yeargers
2016-12-16 17:04:51,821 [StreamThread-1] INFO
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Shutting
down

2016-12-16 17:04:51,821 [StreamThread-1] DEBUG
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1, 0_5,
0_6]] and standby tasks [[]]

On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers 
wrote:

> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
> called unknowingly.
>
> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> try {
> LOGGER.warn("ShutdownHook");
> kafkaStreams.close();
> } catch (Exception e) {
> // ignored
> }
> }));
>
>
> Ran another test and the app closed after ~40min. The above message
> appears 3rd from the end (several seconds after the shutdown process has
> commenced).
>
> (attaching log section)
>
> This has *got* to be something that I've setup improperly... I just can't
> seem to see it.
>
> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers 
> wrote:
>
>> Im seeing instances where my apps are exiting (gracefully, mind you)
>> without any obvious errors or cause. I have debug logs from many instances
>> of this and have yet to find a reason to explain what's happening.
>>
>> - nothing in the app log
>> - nothing in /var/log/messages (IE not OOM killed)
>> - not being closed via /etc/init.d
>> - nothing in the broker logs
>>
>> Running 0.10.1.0
>>
>> example log:
>>
>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0
>> /view?usp=sharing
>>
>
>


Re: What makes a KStream app exit?

2016-12-16 Thread Jon Yeargers
FWIW I put a .warn in my shutdown hook - to make sure it wasn't being
called unknowingly.

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
LOGGER.warn("ShutdownHook");
kafkaStreams.close();
} catch (Exception e) {
// ignored
}
}));


Ran another test and the app closed after ~40min. The above message appears
3rd from the end (several seconds after the shutdown process has commenced).

(attaching log section)

This has *got* to be something that I've setup improperly... I just can't
seem to see it.

On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers 
wrote:

> Im seeing instances where my apps are exiting (gracefully, mind you)
> without any obvious errors or cause. I have debug logs from many instances
> of this and have yet to find a reason to explain what's happening.
>
> - nothing in the app log
> - nothing in /var/log/messages (IE not OOM killed)
> - not being closed via /etc/init.d
> - nothing in the broker logs
>
> Running 0.10.1.0
>
> example log:
>
> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0/
> view?usp=sharing
>


What makes a KStream app exit?

2016-12-16 Thread Jon Yeargers
Im seeing instances where my apps are exiting (gracefully, mind you)
without any obvious errors or cause. I have debug logs from many instances
of this and have yet to find a reason to explain what's happening.

- nothing in the app log
- nothing in /var/log/messages (IE not OOM killed)
- not being closed via /etc/init.d
- nothing in the broker logs

Running 0.10.1.0

example log:

https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0/view?usp=sharing


"support" topics for KStreams

2016-12-15 Thread Jon Yeargers
What's the retention settings for these (-changelog and
-replication)? Im wondering about the relentless rebalancing issues Im
facing and wondering if it has anything to do with consumers that lag too
far behind.

If I delete all the topics associated with a KStream project and restart it
there are no rebalance issues. Everything is fast and responsive.

Over the course of 6-10 hours of execution the rebalances take longer and
longer until eventually the app(s) stop responding at all.

Just curious.


RocksDB - no locks available exception

2016-12-15 Thread Jon Yeargers
Attached is a debug log showing this exception.

Question: is it typical to have so many disconnections from brokers?

This log also includes the exception "Log end offset should not change
while restoring"


errors.log.gz
Description: GNU Zip compressed data


Re: Another odd error

2016-12-15 Thread Jon Yeargers
Update: the app ran well for several hours.. until I tried to update it. I
copied a new build up to one machine (of five) and then we went back to
near-endless-rebalance. After about an hour I ended up killing the other
four instances and watching the first (new one). It took 90 minutes before
it started consuming anything.

This morning I copied / started two more. 60 minutes and still waiting for
rebalance to conclude.

Obviously its impractical to delete the topic(s) before updating the
consumer software. What am I doing wrong thats causing all this waiting?

On Wed, Dec 14, 2016 at 9:28 AM, Jon Yeargers 
wrote:

> In a turn of events - this morning I was about to throw in the proverbial
> towel on Kafka. In a last ditch effort I killed all but one instance of my
> app, put it back to a single thread (why offer the option if it's not
> advised?) and deleted every last topic that had any relation to this app.
>
> I restarted it on a single machine and it magically worked. It's been
> running for more than an hour now and hasn't been stuck in 'rebalance-land'
> at all.
>
> I'll keep watching it and see how it goes.
>
> On Wed, Dec 14, 2016 at 6:13 AM, Damian Guy  wrote:
>
>> We do recommend one thread per instance of the app. However, it should
>> also
>> work with multiple threads.
>> I can't debug the problem any further without the logs from the other
>> apps.
>> We'd need to try and see if another instance still has task 1_3 open ( i
>> suspect it does )
>>
>> Thanks,
>> Damian
>>
>> On Wed, 14 Dec 2016 at 13:20 Jon Yeargers 
>> wrote:
>>
>> > What should I do about this? One thread per app?
>> >
>> > On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy 
>> wrote:
>> >
>> > > That is correct
>> > >
>> > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers 
>> > > wrote:
>> > >
>> > > > I have the app running on 5 machines. Is that what you mean?
>> > > >
>> > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy 
>> > > wrote:
>> > > >
>> > > > > Hi Jon,
>> > > > >
>> > > > > Do you have more than one instance of the app running? The reason
>> i
>> > ask
>> > > > is
>> > > > > because the task (task 1_3) that fails with the
>> > > > > "java.lang.IllegalStateException" in this log is previously
>> running
>> > > as a
>> > > > > Standby Task. This would mean the active task for this store would
>> > have
>> > > > > been running elsewhere, but i don't see that in the logs. The
>> > exception
>> > > > > occurs as StreamThread-1 starts to run task 1_3 as an active task.
>> > The
>> > > > > exception might indicate that another thread/instance is still
>> > writing
>> > > to
>> > > > > the changelog topic for the State Store.
>> > > > >
>> > > > > Thanks,
>> > > > > Damian
>> > > > >
>> > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers <
>> jon.yearg...@cedexis.com>
>> > > > > wrote:
>> > > > >
>> > > > > > As near as I can see it's rebalancing constantly.
>> > > > > >
>> > > > > > I'll up that value and see what happens.
>> > > > > >
>> > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy <
>> damian@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Jon,
>> > > > > > >
>> > > > > > > I haven't had much of a chance to look at the logs in detail
>> too
>> > > much
>> > > > > > yet,
>> > > > > > > but i have noticed that your app seems to be rebalancing
>> > > frequently.
>> > > > > It
>> > > > > > > seems that it is usually around the 300 second mark, which
>> > usually
>> > > > > would
>> > > > > > > mean that poll hasn't been called for at least that long. You
>> > might
>> > > > > want
>> > > > > > to
>> > > > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG
>> to
>> > > > > > > something
>> > > > > > > h

Re: Another odd error

2016-12-14 Thread Jon Yeargers
In a turn of events - this morning I was about to throw in the proverbial
towel on Kafka. In a last ditch effort I killed all but one instance of my
app, put it back to a single thread (why offer the option if it's not
advised?) and deleted every last topic that had any relation to this app.

I restarted it on a single machine and it magically worked. It's been
running for more than an hour now and hasn't been stuck in 'rebalance-land'
at all.

I'll keep watching it and see how it goes.

On Wed, Dec 14, 2016 at 6:13 AM, Damian Guy  wrote:

> We do recommend one thread per instance of the app. However, it should also
> work with multiple threads.
> I can't debug the problem any further without the logs from the other apps.
> We'd need to try and see if another instance still has task 1_3 open ( i
> suspect it does )
>
> Thanks,
> Damian
>
> On Wed, 14 Dec 2016 at 13:20 Jon Yeargers 
> wrote:
>
> > What should I do about this? One thread per app?
> >
> > On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy 
> wrote:
> >
> > > That is correct
> > >
> > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers 
> > > wrote:
> > >
> > > > I have the app running on 5 machines. Is that what you mean?
> > > >
> > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy 
> > > wrote:
> > > >
> > > > > Hi Jon,
> > > > >
> > > > > Do you have more than one instance of the app running? The reason i
> > ask
> > > > is
> > > > > because the task (task 1_3) that fails with the
> > > > > "java.lang.IllegalStateException" in this log is previously
> running
> > > as a
> > > > > Standby Task. This would mean the active task for this store would
> > have
> > > > > been running elsewhere, but i don't see that in the logs. The
> > exception
> > > > > occurs as StreamThread-1 starts to run task 1_3 as an active task.
> > The
> > > > > exception might indicate that another thread/instance is still
> > writing
> > > to
> > > > > the changelog topic for the State Store.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers <
> jon.yearg...@cedexis.com>
> > > > > wrote:
> > > > >
> > > > > > As near as I can see it's rebalancing constantly.
> > > > > >
> > > > > > I'll up that value and see what happens.
> > > > > >
> > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy <
> damian@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Jon,
> > > > > > >
> > > > > > > I haven't had much of a chance to look at the logs in detail
> too
> > > much
> > > > > > yet,
> > > > > > > but i have noticed that your app seems to be rebalancing
> > > frequently.
> > > > > It
> > > > > > > seems that it is usually around the 300 second mark, which
> > usually
> > > > > would
> > > > > > > mean that poll hasn't been called for at least that long. You
> > might
> > > > > want
> > > > > > to
> > > > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG
> to
> > > > > > > something
> > > > > > > higher than 30 (which is the default).
> > > > > > >
> > > > > > > I'll continue to look at your logs and get back to you.
> > > > > > > Thanks,
> > > > > > > Damian
> > > > > > >
> > > > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers <
> > > jon.yearg...@cedexis.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > attached is a log with lots of disconnections and a small
> > amount
> > > of
> > > > > > > > actual, useful activity.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers <
> > > > > > jon.yearg...@cedexis.com>
> > > > > > > > wrote:
> > > &g

Re: Another odd error

2016-12-14 Thread Jon Yeargers
What should I do about this? One thread per app?

On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy  wrote:

> That is correct
>
> On Wed, 14 Dec 2016 at 12:09 Jon Yeargers 
> wrote:
>
> > I have the app running on 5 machines. Is that what you mean?
> >
> > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy 
> wrote:
> >
> > > Hi Jon,
> > >
> > > Do you have more than one instance of the app running? The reason i ask
> > is
> > > because the task (task 1_3) that fails with the
> > > "java.lang.IllegalStateException" in this log is previously running
> as a
> > > Standby Task. This would mean the active task for this store would have
> > > been running elsewhere, but i don't see that in the logs. The exception
> > > occurs as StreamThread-1 starts to run task 1_3 as an active task. The
> > > exception might indicate that another thread/instance is still writing
> to
> > > the changelog topic for the State Store.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers 
> > > wrote:
> > >
> > > > As near as I can see it's rebalancing constantly.
> > > >
> > > > I'll up that value and see what happens.
> > > >
> > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy 
> > > wrote:
> > > >
> > > > > Hi Jon,
> > > > >
> > > > > I haven't had much of a chance to look at the logs in detail too
> much
> > > > yet,
> > > > > but i have noticed that your app seems to be rebalancing
> frequently.
> > > It
> > > > > seems that it is usually around the 300 second mark, which usually
> > > would
> > > > > mean that poll hasn't been called for at least that long. You might
> > > want
> > > > to
> > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to
> > > > > something
> > > > > higher than 30 (which is the default).
> > > > >
> > > > > I'll continue to look at your logs and get back to you.
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers <
> jon.yearg...@cedexis.com>
> > > > > wrote:
> > > > >
> > > > > > attached is a log with lots of disconnections and a small amount
> of
> > > > > > actual, useful activity.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers <
> > > > jon.yearg...@cedexis.com>
> > > > > > wrote:
> > > > > >
> > > > > > n/m - I understand the logging issue now. Am generating a new
> one.
> > > Will
> > > > > > send shortly.
> > > > > >
> > > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers <
> > > > jon.yearg...@cedexis.com>
> > > > > > wrote:
> > > > > >
> > > > > > Yes - saw that one. There were plenty of smaller records
> available
> > > > > though.
> > > > > >
> > > > > > I sent another log this morning with the level set to DEBUG.
> > > Hopefully
> > > > > you
> > > > > > rec'd it.
> > > > > >
> > > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy <
> damian@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > HI Jon,
> > > > > >
> > > > > > It looks like you have the logging level for KafkaStreams set to
> at
> > > > least
> > > > > > WARN. I can only see ERROR level logs being produced from
> Streams.
> > > > > >
> > > > > > However, i did notice an issue in the logs (not related to your
> > > > specific
> > > > > > error but you will need to fix anyway):
> > > > > >
> > > > > > There are lots of messages like:
> > > > > > task [2_9] Error sending record to topic
> > > > > > PRTMinuteAgg-prt_hour_agg_stream-changelog
> > > > > > org.apache.kafka.common.errors.RecordTooLargeException: The
> message
> > > is
> > > > > > 2381750 bytes when ser

Re: Another odd error

2016-12-14 Thread Jon Yeargers
I have the app running on 5 machines. Is that what you mean?

On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy  wrote:

> Hi Jon,
>
> Do you have more than one instance of the app running? The reason i ask is
> because the task (task 1_3) that fails with the
> "java.lang.IllegalStateException" in this log is previously running as a
> Standby Task. This would mean the active task for this store would have
> been running elsewhere, but i don't see that in the logs. The exception
> occurs as StreamThread-1 starts to run task 1_3 as an active task. The
> exception might indicate that another thread/instance is still writing to
> the changelog topic for the State Store.
>
> Thanks,
> Damian
>
> On Tue, 13 Dec 2016 at 17:23 Jon Yeargers 
> wrote:
>
> > As near as I can see it's rebalancing constantly.
> >
> > I'll up that value and see what happens.
> >
> > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy 
> wrote:
> >
> > > Hi Jon,
> > >
> > > I haven't had much of a chance to look at the logs in detail too much
> > yet,
> > > but i have noticed that your app seems to be rebalancing frequently.
> It
> > > seems that it is usually around the 300 second mark, which usually
> would
> > > mean that poll hasn't been called for at least that long. You might
> want
> > to
> > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to
> > > something
> > > higher than 30 (which is the default).
> > >
> > > I'll continue to look at your logs and get back to you.
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers 
> > > wrote:
> > >
> > > > attached is a log with lots of disconnections and a small amount of
> > > > actual, useful activity.
> > > >
> > > >
> > > >
> > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers <
> > jon.yearg...@cedexis.com>
> > > > wrote:
> > > >
> > > > n/m - I understand the logging issue now. Am generating a new one.
> Will
> > > > send shortly.
> > > >
> > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers <
> > jon.yearg...@cedexis.com>
> > > > wrote:
> > > >
> > > > Yes - saw that one. There were plenty of smaller records available
> > > though.
> > > >
> > > > I sent another log this morning with the level set to DEBUG.
> Hopefully
> > > you
> > > > rec'd it.
> > > >
> > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy 
> > > wrote:
> > > >
> > > > HI Jon,
> > > >
> > > > It looks like you have the logging level for KafkaStreams set to at
> > least
> > > > WARN. I can only see ERROR level logs being produced from Streams.
> > > >
> > > > However, i did notice an issue in the logs (not related to your
> > specific
> > > > error but you will need to fix anyway):
> > > >
> > > > There are lots of messages like:
> > > > task [2_9] Error sending record to topic
> > > > PRTMinuteAgg-prt_hour_agg_stream-changelog
> > > > org.apache.kafka.common.errors.RecordTooLargeException: The message
> is
> > > > 2381750 bytes when serialized which is larger than the maximum
> > > >
> > > > This means you need to add some extra config to your StreamsConfig:
> > > > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
> > > > expectedMaximumMessageSizeBytes)
> > > >
> > > > You will also possible need to adjust the broker properties and
> > > > increase message.max.bytes
> > > > - it will need to be at least as large as the setting above.
> > > >
> > > > At the moment all of the change-logs for your state-stores are being
> > > > dropped due to this issue.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > >
> > > >
> > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers 
> > > > wrote:
> > > >
> > > > > (am attaching a debug log - note that app terminated with no
> further
> > > > > messages)
> > > > >
> > > > > topology: kStream -> groupByKey.aggregate(minute) -> foreach
> > > > >  \-> groupByKey.aggregate(hour) ->
> > foreach
> > > >

Re: How does 'TimeWindows.of().until()' work?

2016-12-13 Thread Jon Yeargers
 are keeping default until which is 1 day.
> >>>
> >>> Our record's times tamp extractor has a field which increases with
> time.
> >>> However for short time we cannot guarantee the time stamp is always
> >>> increases. So at the boundary ie after 24 hrs we can get records which
> >> are
> >>> beyond that windows retention period.
> >>>
> >>> Then it happens like it is mentioned above and our aggregation fails.
> >>>
> >>> So just to sum up when we get record
> >>> 24h + 1 sec (it deletes older window and since the new record belongs
> to
> >>> the new window its gets created)
> >>> Now when we get next record of 24 hs - 1 sec since older window is
> >> dropped
> >>> it does not get aggregated in that bucket.
> >>>
> >>> I suggest we have another setting next to until call retain which
> retains
> >>> the older windows into next window.
> >>>
> >>> I think at stream window boundary level it should use a concept of
> >> sliding
> >>> window. So we can define window like
> >>>
> >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >> 1000l).untill(7
> >>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>
> >>> So after 7 days it retains the data covered by windows in last 15
> minutes
> >>> which rolls over the data in them to next window. This way streams work
> >>> continuously.
> >>>
> >>> Please let us know your thoughts on this.
> >>>
> >>> On another side question on this there is a setting:
> >>>
> >>> windowstore.changelog.additional.retention.ms
> >>> I is not clear what is does. Is this the default for until?
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> Windows are created on demand, ie, each time a new record arrives and
> >>>> there is no window yet for it, a new window will get created.
> >>>>
> >>>> Windows are accepting data until their retention time (that you can
> >>>> configure via .until()) passed. Thus, you will have many windows being
> >>>> open in parallel.
> >>>>
> >>>> If you read older data, they will just be put into the corresponding
> >>>> windows (as long as window retention time did not pass). If a window
> was
> >>>> discarded already, a new window with this single (later arriving)
> record
> >>>> will get created, the computation will be triggered, you get a result,
> >>>> and afterwards the window is deleted again (as it's retention time
> >>>> passed already).
> >>>>
> >>>> The retention time is driven by "stream-time", in internal tracked
> time
> >>>> that only progressed in forward direction. It gets it value from the
> >>>> timestamps provided by TimestampExtractor -- thus, per default it will
> >>>> be event-time.
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>> I've read this and still have more questions than answers. If my data
> >>>> skips
> >>>>> about (timewise) what determines when a given window will start /
> stop
> >>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>
> >>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Please have a look here:
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>> guide.html#windowing-a-stream
> >>>>>>
> >>>>>> If you have further question, just follow up :)
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
> >>>> working
> >>>>>>> wonders for keeping the size of the state store in useful
> >> boundaries...
> >>>>>> But
> >>>>>>> Im not 100% clear on how it works.
> >>>>>>>
> >>>>>>> What is implied by the '.until()' clause? What determines when to
> >> stop
> >>>>>>> receiving further data - is it clock time (since the window was
> >>>> created)?
> >>>>>>> It seems problematic for it to refer to EventTime as this may
> bounce
> >>>> all
> >>>>>>> over the place. For non-overlapping windows a given record can only
> >>>> fall
> >>>>>>> into a single aggregation period - so when would a value get
> >> discarded?
> >>>>>>>
> >>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>>>> 1000L).until(10 *
> >>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>


reasonable KStream app config settings?

2016-12-13 Thread Jon Yeargers
My app seems to be continuously rebalancing. If I said it processed data
maybe 3 minutes / hour I wouldn't be exaggerating. Surely this isn't normal
behavior.

My config is:

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");

config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");

config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5242880");

config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60");

Does this seem reasonable / rational? Some default that I shouldn't rely on?


Re: Another odd error

2016-12-13 Thread Jon Yeargers
As near as I can see it's rebalancing constantly.

I'll up that value and see what happens.

On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy  wrote:

> Hi Jon,
>
> I haven't had much of a chance to look at the logs in detail too much yet,
> but i have noticed that your app seems to be rebalancing frequently.  It
> seems that it is usually around the 300 second mark, which usually would
> mean that poll hasn't been called for at least that long. You might want to
> try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to
> something
> higher than 30 (which is the default).
>
> I'll continue to look at your logs and get back to you.
> Thanks,
> Damian
>
> On Tue, 13 Dec 2016 at 15:02 Jon Yeargers 
> wrote:
>
> > attached is a log with lots of disconnections and a small amount of
> > actual, useful activity.
> >
> >
> >
> > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers 
> > wrote:
> >
> > n/m - I understand the logging issue now. Am generating a new one. Will
> > send shortly.
> >
> > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers 
> > wrote:
> >
> > Yes - saw that one. There were plenty of smaller records available
> though.
> >
> > I sent another log this morning with the level set to DEBUG. Hopefully
> you
> > rec'd it.
> >
> > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy 
> wrote:
> >
> > HI Jon,
> >
> > It looks like you have the logging level for KafkaStreams set to at least
> > WARN. I can only see ERROR level logs being produced from Streams.
> >
> > However, i did notice an issue in the logs (not related to your specific
> > error but you will need to fix anyway):
> >
> > There are lots of messages like:
> > task [2_9] Error sending record to topic
> > PRTMinuteAgg-prt_hour_agg_stream-changelog
> > org.apache.kafka.common.errors.RecordTooLargeException: The message is
> > 2381750 bytes when serialized which is larger than the maximum
> >
> > This means you need to add some extra config to your StreamsConfig:
> > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
> > expectedMaximumMessageSizeBytes)
> >
> > You will also possible need to adjust the broker properties and
> > increase message.max.bytes
> > - it will need to be at least as large as the setting above.
> >
> > At the moment all of the change-logs for your state-stores are being
> > dropped due to this issue.
> >
> > Thanks,
> > Damian
> >
> >
> >
> > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers 
> > wrote:
> >
> > > (am attaching a debug log - note that app terminated with no further
> > > messages)
> > >
> > > topology: kStream -> groupByKey.aggregate(minute) -> foreach
> > >  \-> groupByKey.aggregate(hour) -> foreach
> > >
> > >
> > > config:
> > >
> > > Properties config = new Properties();
> > > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> > > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> ZOOKEEPER_IP);
> > > config.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "PRTMinuteAgg" );
> > > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > AggKey.class.getName());
> > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass().getName());
> > > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
> > >         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
> > > config.put(StreamsConfig.STATE_DIR_CONFIG,
> "/mnt/PRTMinuteAgg");
> > >
> > > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
> > >
> > >
> > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang 
> > wrote:
> > >
> > > Jon,
> > >
> > > To help investigating this issue, could you let me know 1) your
> topology
> > > sketch and 2) your app configs? For example did you enable caching in
> > your
> > > apps with the cache.max.bytes.buffering config?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <
> jon.yearg...@cedexis.com>
> > > wrote:
> > >
> > > > I get this one quite a bit. It kills my app after a short time of
> > > running.
> > > > Driving me nuts.
> >

KStreams app - frequent broker dis/re connects

2016-12-13 Thread Jon Yeargers
Watching the debug output on an app - wondering why it spends nearly all of
its time rebalancing. Noticed that it seems to drop / recreate connections
to brokers pretty frequently. No error messages to speak of though.

Connect / timeout / related settings in the consumer are all default.

How much connection-thrashing is considered typical?


Re: Another odd error

2016-12-13 Thread Jon Yeargers
n/m - I understand the logging issue now. Am generating a new one. Will
send shortly.

On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers 
wrote:

> Yes - saw that one. There were plenty of smaller records available though.
>
> I sent another log this morning with the level set to DEBUG. Hopefully you
> rec'd it.
>
> On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy  wrote:
>
>> HI Jon,
>>
>> It looks like you have the logging level for KafkaStreams set to at least
>> WARN. I can only see ERROR level logs being produced from Streams.
>>
>> However, i did notice an issue in the logs (not related to your specific
>> error but you will need to fix anyway):
>>
>> There are lots of messages like:
>> task [2_9] Error sending record to topic
>> PRTMinuteAgg-prt_hour_agg_stream-changelog
>> org.apache.kafka.common.errors.RecordTooLargeException: The message is
>> 2381750 bytes when serialized which is larger than the maximum
>>
>> This means you need to add some extra config to your StreamsConfig:
>> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
>> expectedMaximumMessageSizeBytes)
>>
>> You will also possible need to adjust the broker properties and
>> increase message.max.bytes
>> - it will need to be at least as large as the setting above.
>>
>> At the moment all of the change-logs for your state-stores are being
>> dropped due to this issue.
>>
>> Thanks,
>> Damian
>>
>>
>> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers 
>> wrote:
>>
>> > (am attaching a debug log - note that app terminated with no further
>> > messages)
>> >
>> > topology: kStream -> groupByKey.aggregate(minute) -> foreach
>> >  \-> groupByKey.aggregate(hour) -> foreach
>> >
>> >
>> > config:
>> >
>> > Properties config = new Properties();
>> > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>> ZOOKEEPER_IP);
>> > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg"
>> );
>> > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > AggKey.class.getName());
>> > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass().getName());
>> > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
>> > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
>> > config.put(StreamsConfig.STATE_DIR_CONFIG,
>> "/mnt/PRTMinuteAgg");
>> >
>> > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
>> >
>> >
>> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang 
>> wrote:
>> >
>> > Jon,
>> >
>> > To help investigating this issue, could you let me know 1) your topology
>> > sketch and 2) your app configs? For example did you enable caching in
>> your
>> > apps with the cache.max.bytes.buffering config?
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers > >
>> > wrote:
>> >
>> > > I get this one quite a bit. It kills my app after a short time of
>> > running.
>> > > Driving me nuts.
>> > >
>> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> > > wrote:
>> > >
>> > > > Not sure about this one.
>> > > >
>> > > > Can you describe what you do exactly? Can you reproduce the issue?
>> We
>> > > > definitely want to investigate this.
>> > > >
>> > > > -Matthias
>> > > >
>> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
>> > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
>> > > > >
>> > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
>> group
>> > > > > MinuteAgg failed on partition assignment
>> > > > >
>> > > > > java.lang.IllegalStateException: task [1_9] Log end offset
>> should not
>> > > > > change while restoring
>> > > > >
>> > > > > at
>> > > > > org.apache.kafka.streams.processor.internals.Pro

Re: Another odd error

2016-12-13 Thread Jon Yeargers
Yes - saw that one. There were plenty of smaller records available though.

I sent another log this morning with the level set to DEBUG. Hopefully you
rec'd it.

On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy  wrote:

> HI Jon,
>
> It looks like you have the logging level for KafkaStreams set to at least
> WARN. I can only see ERROR level logs being produced from Streams.
>
> However, i did notice an issue in the logs (not related to your specific
> error but you will need to fix anyway):
>
> There are lots of messages like:
> task [2_9] Error sending record to topic
> PRTMinuteAgg-prt_hour_agg_stream-changelog
> org.apache.kafka.common.errors.RecordTooLargeException: The message is
> 2381750 bytes when serialized which is larger than the maximum
>
> This means you need to add some extra config to your StreamsConfig:
> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
> expectedMaximumMessageSizeBytes)
>
> You will also possible need to adjust the broker properties and
> increase message.max.bytes
> - it will need to be at least as large as the setting above.
>
> At the moment all of the change-logs for your state-stores are being
> dropped due to this issue.
>
> Thanks,
> Damian
>
>
> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers 
> wrote:
>
> > (am attaching a debug log - note that app terminated with no further
> > messages)
> >
> > topology: kStream -> groupByKey.aggregate(minute) -> foreach
> >  \-> groupByKey.aggregate(hour) -> foreach
> >
> >
> > config:
> >
> > Properties config = new Properties();
> > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> ZOOKEEPER_IP);
> > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg"
> );
> > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
> > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
> > config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");
> >
> > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
> >
> >
> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang 
> wrote:
> >
> > Jon,
> >
> > To help investigating this issue, could you let me know 1) your topology
> > sketch and 2) your app configs? For example did you enable caching in
> your
> > apps with the cache.max.bytes.buffering config?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers 
> > wrote:
> >
> > > I get this one quite a bit. It kills my app after a short time of
> > running.
> > > Driving me nuts.
> > >
> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Not sure about this one.
> > > >
> > > > Can you describe what you do exactly? Can you reproduce the issue? We
> > > > definitely want to investigate this.
> > > >
> > > > -Matthias
> > > >
> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > > >
> > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> group
> > > > > MinuteAgg failed on partition assignment
> > > > >
> > > > > java.lang.IllegalStateException: task [1_9] Log end offset should
> not
> > > > > change while restoring
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.
> > > > restoreActiveState(ProcessorStateManager.java:245)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.
> > > > register(ProcessorStateManager.java:198)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > > >
> > > > > at
> 

Re: Another odd error

2016-12-13 Thread Jon Yeargers
(am attaching a debug log - note that app terminated with no further
messages)

topology: kStream -> groupByKey.aggregate(minute) -> foreach
 \-> groupByKey.aggregate(hour) -> foreach


config:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");

config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");


On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang  wrote:

> Jon,
>
> To help investigating this issue, could you let me know 1) your topology
> sketch and 2) your app configs? For example did you enable caching in your
> apps with the cache.max.bytes.buffering config?
>
>
> Guozhang
>
>
> On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers 
> wrote:
>
> > I get this one quite a bit. It kills my app after a short time of
> running.
> > Driving me nuts.
> >
> > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax 
> > wrote:
> >
> > > Not sure about this one.
> > >
> > > Can you describe what you do exactly? Can you reproduce the issue? We
> > > definitely want to investigate this.
> > >
> > > -Matthias
> > >
> > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > >
> > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> group
> > > > MinuteAgg failed on partition assignment
> > > >
> > > > java.lang.IllegalStateException: task [1_9] Log end offset should
> not
> > > > change while restoring
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > restoreActiveState(ProcessorStateManager.java:245)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > register(ProcessorStateManager.java:198)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > > RocksDBWindowStore.java:206)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> > > MeteredWindowStore.java:66)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> > > CachingWindowStore.java:64)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > initializeStateStores(AbstractTask.java:81)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.(StreamTask.java:120)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.createStreamTask(StreamThread.java:633)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.access$100(
> > > StreamThread.java:69)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > onPartitionsAssigned(StreamThread.java:124)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > onJoinComplete(ConsumerCoordinator.java:228)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:259)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1013)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:979)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:407)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:242)
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


errors.log.gz
Description: GNU Zip compressed data


Re: "Log end offset should not change while restoring"

2016-12-12 Thread Jon Yeargers
What is the specific cache config setting?

On Mon, Dec 12, 2016 at 1:49 PM, Matthias J. Sax 
wrote:

> We discovered a few more bugs and a bug fix release 0.10.1.1 is planned
> already.
>
> The voting started for it, and it should get release the next weeks.
>
> If you issues is related to this caching problem, disabling the cache
> via StreamsConfig should fix the problem for now. Just set the cache
> size to zero.
>
>
> -Matthias
>
>
> On 12/12/16 2:31 AM, Jon Yeargers wrote:
> > Im seeing this error occur more frequently of late. I ran across this
> > thread:
> > https://groups.google.com/forum/#!topic/confluent-platform/AH5QClSNZBw
> >
> >
> > The implication from the thread is that a fix is available. Where can I
> get
> > it?
> >
>
>


Re: rocksdb error(s)

2016-12-12 Thread Jon Yeargers
The attached error log shows several of the problems I've run into. After
hitting this list (and typically much less) the app dies.

On Mon, Dec 12, 2016 at 4:48 AM, Damian Guy  wrote:

> Just set the log level to debug and then run your app until you start
> seeing the problem.
> Thanks
>
> On Mon, 12 Dec 2016 at 12:47 Jon Yeargers 
> wrote:
>
> > I can log whatever you need. Tell me what is useful.
> >
> > On Mon, Dec 12, 2016 at 4:43 AM, Damian Guy 
> wrote:
> >
> > > If you provide the logs from your streams application then we might
> have
> > > some chance of working out what is going on. Without logs then we
> really
> > > don't have much hope of diagnosing the problem.
> > >
> > > On Mon, 12 Dec 2016 at 12:18 Jon Yeargers 
> > > wrote:
> > >
> > > > Im running as many threads as I have partitions on this topic. Just
> > > curious
> > > > if it would make any difference to the seemingly endless rebalancing
> > > woes.
> > > >
> > > > So far no change. In fact, I'll often see all 10 partitions (plus the
> > 2 x
> > > > 10 for the two aggregations) assigned to a single thread.
> > > >
> > > > On Mon, Dec 12, 2016 at 4:15 AM, Jon Yeargers <
> > jon.yearg...@cedexis.com>
> > > > wrote:
> > > >
> > > > > At this moment I have 5 instances each running 2 threads.
> > > > > Single instance / machine.
> > > > >
> > > > > Define 'full logs' ?
> > > > >
> > > > > On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy 
> > > > wrote:
> > > > >
> > > > >> Jon,
> > > > >>
> > > > >> How many StreamThreads do you have running?
> > > > >> How many application instances?
> > > > >> Do you have more than one instance per machine? If yes, are they
> > > sharing
> > > > >> the same State Directory?
> > > > >> Do you have full logs that can be provided so we can try and see
> > > > how/what
> > > > >> is happening?
> > > > >>
> > > > >> Thanks,
> > > > >> Damian
> > > > >>
> > > > >> On Mon, 12 Dec 2016 at 10:17 Jon Yeargers <
> jon.yearg...@cedexis.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >> > No luck here. Moved all state storage to a non-tmp folder and
> > > > restarted.
> > > > >> > Still hitting the 'No locks available' error quite frequently.
> > > > >> >
> > > > >> > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers <
> > > > jon.yearg...@cedexis.com
> > > > >> >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > I moved the state folder to a separate drive and linked out to
> > it.
> > > > >> > >
> > > > >> > > I'll try your suggestion and point directly.
> > > > >> > >
> > > > >> > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax <
> > > > >> matth...@confluent.io>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > >> I am not sure, but this might be related with your state
> > > directory.
> > > > >> > >>
> > > > >> > >> You use default directory that is located in /tmp -- could it
> > be,
> > > > >> that
> > > > >> > >> /tmp gets clean up and thus you loose files/directories?
> > > > >> > >>
> > > > >> > >> Try to reconfigure your state directory via StreamsConfig:
> > > > >> > >> http://docs.confluent.io/current/streams/developer-guide.
> > > > >> > >> html#optional-configuration-parameters
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> -Matthias
> > > > >> > >>
> > > > >> > >> On 12/11/16 1:28 AM, Jon Yeargers wrote:
> > > > >> > >> > Seeing this appearing somewhat frequently -
> > > > >> > >> >
> > > > >> > >> > org.apache.kafka.strea

Re: rocksdb error(s)

2016-12-12 Thread Jon Yeargers
I can log whatever you need. Tell me what is useful.

On Mon, Dec 12, 2016 at 4:43 AM, Damian Guy  wrote:

> If you provide the logs from your streams application then we might have
> some chance of working out what is going on. Without logs then we really
> don't have much hope of diagnosing the problem.
>
> On Mon, 12 Dec 2016 at 12:18 Jon Yeargers 
> wrote:
>
> > Im running as many threads as I have partitions on this topic. Just
> curious
> > if it would make any difference to the seemingly endless rebalancing
> woes.
> >
> > So far no change. In fact, I'll often see all 10 partitions (plus the 2 x
> > 10 for the two aggregations) assigned to a single thread.
> >
> > On Mon, Dec 12, 2016 at 4:15 AM, Jon Yeargers 
> > wrote:
> >
> > > At this moment I have 5 instances each running 2 threads.
> > > Single instance / machine.
> > >
> > > Define 'full logs' ?
> > >
> > > On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy 
> > wrote:
> > >
> > >> Jon,
> > >>
> > >> How many StreamThreads do you have running?
> > >> How many application instances?
> > >> Do you have more than one instance per machine? If yes, are they
> sharing
> > >> the same State Directory?
> > >> Do you have full logs that can be provided so we can try and see
> > how/what
> > >> is happening?
> > >>
> > >> Thanks,
> > >> Damian
> > >>
> > >> On Mon, 12 Dec 2016 at 10:17 Jon Yeargers 
> > >> wrote:
> > >>
> > >> > No luck here. Moved all state storage to a non-tmp folder and
> > restarted.
> > >> > Still hitting the 'No locks available' error quite frequently.
> > >> >
> > >> > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers <
> > jon.yearg...@cedexis.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > I moved the state folder to a separate drive and linked out to it.
> > >> > >
> > >> > > I'll try your suggestion and point directly.
> > >> > >
> > >> > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax <
> > >> matth...@confluent.io>
> > >> > > wrote:
> > >> > >
> > >> > >> I am not sure, but this might be related with your state
> directory.
> > >> > >>
> > >> > >> You use default directory that is located in /tmp -- could it be,
> > >> that
> > >> > >> /tmp gets clean up and thus you loose files/directories?
> > >> > >>
> > >> > >> Try to reconfigure your state directory via StreamsConfig:
> > >> > >> http://docs.confluent.io/current/streams/developer-guide.
> > >> > >> html#optional-configuration-parameters
> > >> > >>
> > >> > >>
> > >> > >> -Matthias
> > >> > >>
> > >> > >> On 12/11/16 1:28 AM, Jon Yeargers wrote:
> > >> > >> > Seeing this appearing somewhat frequently -
> > >> > >> >
> > >> > >> > org.apache.kafka.streams.errors.ProcessorStateException: Error
> > >> opening
> > >> > >> > store minute_agg_stream-201612100812 at location
> > >> > >> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag
> > >> > >> g_stream-201612100812
> > >> > >> >
> > >> > >> > at
> > >> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB
> > >> > >> (RocksDBStore.java:196)
> > >> > >> >
> > >> > >> > at
> > >> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB
> > >> > >> (RocksDBStore.java:158)
> > >> > >> >
> > >> > >> > at
> > >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
> > >> > >> Segment.openDB(RocksDBWindowStore.java:72)
> > >> > >> >
> > >> > >> > at
> > >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > >> > >> getOrCreateSegment(RocksDBWindowStore.java:402)
> > >> > &

Re: rocksdb error(s)

2016-12-12 Thread Jon Yeargers
Im running as many threads as I have partitions on this topic. Just curious
if it would make any difference to the seemingly endless rebalancing woes.

So far no change. In fact, I'll often see all 10 partitions (plus the 2 x
10 for the two aggregations) assigned to a single thread.

On Mon, Dec 12, 2016 at 4:15 AM, Jon Yeargers 
wrote:

> At this moment I have 5 instances each running 2 threads.
> Single instance / machine.
>
> Define 'full logs' ?
>
> On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy  wrote:
>
>> Jon,
>>
>> How many StreamThreads do you have running?
>> How many application instances?
>> Do you have more than one instance per machine? If yes, are they sharing
>> the same State Directory?
>> Do you have full logs that can be provided so we can try and see how/what
>> is happening?
>>
>> Thanks,
>> Damian
>>
>> On Mon, 12 Dec 2016 at 10:17 Jon Yeargers 
>> wrote:
>>
>> > No luck here. Moved all state storage to a non-tmp folder and restarted.
>> > Still hitting the 'No locks available' error quite frequently.
>> >
>> > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers > >
>> > wrote:
>> >
>> > > I moved the state folder to a separate drive and linked out to it.
>> > >
>> > > I'll try your suggestion and point directly.
>> > >
>> > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> > > wrote:
>> > >
>> > >> I am not sure, but this might be related with your state directory.
>> > >>
>> > >> You use default directory that is located in /tmp -- could it be,
>> that
>> > >> /tmp gets clean up and thus you loose files/directories?
>> > >>
>> > >> Try to reconfigure your state directory via StreamsConfig:
>> > >> http://docs.confluent.io/current/streams/developer-guide.
>> > >> html#optional-configuration-parameters
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >> On 12/11/16 1:28 AM, Jon Yeargers wrote:
>> > >> > Seeing this appearing somewhat frequently -
>> > >> >
>> > >> > org.apache.kafka.streams.errors.ProcessorStateException: Error
>> opening
>> > >> > store minute_agg_stream-201612100812 at location
>> > >> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag
>> > >> g_stream-201612100812
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB
>> > >> (RocksDBStore.java:196)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB
>> > >> (RocksDBStore.java:158)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
>> > >> Segment.openDB(RocksDBWindowStore.java:72)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
>> > >> getOrCreateSegment(RocksDBWindowStore.java:402)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
>> > >> putInternal(RocksDBWindowStore.java:333)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
>> > >> access$100(RocksDBWindowStore.java:51)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
>> > >> 2.restore(RocksDBWindowStore.java:212)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> > >> anager.restoreActiveState(ProcessorStateManager.java:235)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> > >> anager.register(ProcessorStateManager.java:198)
>> > >> >
>> > >> > at
>> > >> > org.apache.kafka.streams.processor.internals.ProcessorContex
>> > >> tImpl.register(ProcessorContextImpl.java:123)
>

Re: rocksdb error(s)

2016-12-12 Thread Jon Yeargers
At this moment I have 5 instances each running 2 threads.
Single instance / machine.

Define 'full logs' ?

On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy  wrote:

> Jon,
>
> How many StreamThreads do you have running?
> How many application instances?
> Do you have more than one instance per machine? If yes, are they sharing
> the same State Directory?
> Do you have full logs that can be provided so we can try and see how/what
> is happening?
>
> Thanks,
> Damian
>
> On Mon, 12 Dec 2016 at 10:17 Jon Yeargers 
> wrote:
>
> > No luck here. Moved all state storage to a non-tmp folder and restarted.
> > Still hitting the 'No locks available' error quite frequently.
> >
> > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers 
> > wrote:
> >
> > > I moved the state folder to a separate drive and linked out to it.
> > >
> > > I'll try your suggestion and point directly.
> > >
> > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > >> I am not sure, but this might be related with your state directory.
> > >>
> > >> You use default directory that is located in /tmp -- could it be, that
> > >> /tmp gets clean up and thus you loose files/directories?
> > >>
> > >> Try to reconfigure your state directory via StreamsConfig:
> > >> http://docs.confluent.io/current/streams/developer-guide.
> > >> html#optional-configuration-parameters
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 12/11/16 1:28 AM, Jon Yeargers wrote:
> > >> > Seeing this appearing somewhat frequently -
> > >> >
> > >> > org.apache.kafka.streams.errors.ProcessorStateException: Error
> opening
> > >> > store minute_agg_stream-201612100812 at location
> > >> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag
> > >> g_stream-201612100812
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB
> > >> (RocksDBStore.java:196)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB
> > >> (RocksDBStore.java:158)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
> > >> Segment.openDB(RocksDBWindowStore.java:72)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > >> getOrCreateSegment(RocksDBWindowStore.java:402)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > >> putInternal(RocksDBWindowStore.java:333)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > >> access$100(RocksDBWindowStore.java:51)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
> > >> 2.restore(RocksDBWindowStore.java:212)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateM
> > >> anager.restoreActiveState(ProcessorStateManager.java:235)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateM
> > >> anager.register(ProcessorStateManager.java:198)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.processor.internals.ProcessorContex
> > >> tImpl.register(ProcessorContextImpl.java:123)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > >> init(RocksDBWindowStore.java:206)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.MeteredWindowStore.
> > >> init(MeteredWindowStore.java:66)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.state.internals.CachingWindowStore.
> > >> init(CachingWindowStore.java:64)
> > >> >
> > >> > at
> > >> > org.apache.kafka.streams.processor.internals.AbstractTask.in
> > &

partition count multiples - adverse effects on rebalancing?

2016-12-12 Thread Jon Yeargers
Just curious - how is rebalancing handled when the number of potential
consumer threads isn't a multiple of the number of partitions?

IE If I have 9 partitions and 6 threads - will the cluster be forever
trying to balance this?


doing > 1 'parallel' operation on a stream

2016-12-12 Thread Jon Yeargers
If I want to aggregate a stream twice using different windows do I need to
split / copy / duplicate the source stream somehow? Or will this be handled
without my interference?


"Log end offset should not change while restoring"

2016-12-12 Thread Jon Yeargers
Im seeing this error occur more frequently of late. I ran across this
thread:
https://groups.google.com/forum/#!topic/confluent-platform/AH5QClSNZBw


The implication from the thread is that a fix is available. Where can I get
it?


Re: rocksdb error(s)

2016-12-12 Thread Jon Yeargers
No luck here. Moved all state storage to a non-tmp folder and restarted.
Still hitting the 'No locks available' error quite frequently.

On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers 
wrote:

> I moved the state folder to a separate drive and linked out to it.
>
> I'll try your suggestion and point directly.
>
> On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax 
> wrote:
>
>> I am not sure, but this might be related with your state directory.
>>
>> You use default directory that is located in /tmp -- could it be, that
>> /tmp gets clean up and thus you loose files/directories?
>>
>> Try to reconfigure your state directory via StreamsConfig:
>> http://docs.confluent.io/current/streams/developer-guide.
>> html#optional-configuration-parameters
>>
>>
>> -Matthias
>>
>> On 12/11/16 1:28 AM, Jon Yeargers wrote:
>> > Seeing this appearing somewhat frequently -
>> >
>> > org.apache.kafka.streams.errors.ProcessorStateException: Error opening
>> > store minute_agg_stream-201612100812 at location
>> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag
>> g_stream-201612100812
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB
>> (RocksDBStore.java:196)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB
>> (RocksDBStore.java:158)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
>> Segment.openDB(RocksDBWindowStore.java:72)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
>> getOrCreateSegment(RocksDBWindowStore.java:402)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
>> putInternal(RocksDBWindowStore.java:333)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
>> access$100(RocksDBWindowStore.java:51)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
>> 2.restore(RocksDBWindowStore.java:212)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.restoreActiveState(ProcessorStateManager.java:235)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.register(ProcessorStateManager.java:198)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorContex
>> tImpl.register(ProcessorContextImpl.java:123)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
>> init(RocksDBWindowStore.java:206)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.MeteredWindowStore.
>> init(MeteredWindowStore.java:66)
>> >
>> > at
>> > org.apache.kafka.streams.state.internals.CachingWindowStore.
>> init(CachingWindowStore.java:64)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.AbstractTask.in
>> itializeStateStores(AbstractTask.java:81)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamTask.<
>> init>(StreamTask.java:120)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> createStreamTask(StreamThread.java:633)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> addStreamTasks(StreamThread.java:660)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> access$100(StreamThread.java:69)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread$1.
>> onPartitionsAssigned(StreamThread.java:124)
>> >
>> > at
>> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.onJoinComplete(ConsumerCoordinator.java:228)
>> >
>> > at
>> > org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.joinGroupIfNeeded(AbstractCoordinator.java:313)
>> >
>> > at
>> > org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.ensureActiveGroup(AbstractCoordinator.java:277)
>> >
>> > at
>> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.poll(ConsumerCoordinat

Re: rebalancing - how to speed it up?

2016-12-11 Thread Jon Yeargers
After an hour: it briefly popped up with 1 instance 'applied' to all 10
partitions... then it went back to rebalance for 10-15 minutes.. followed
by a different instance on all partitions.. and then more rebalancing..

At no point (yet) have I seen the work get truly 'balanced' between all 5
instances.

On Sun, Dec 11, 2016 at 6:04 PM, Jon Yeargers 
wrote:

> I changed 'num.standby.replicas' to '2'.
>
> I started one instance and it immediately showed up in the
> 'kafka-consumer-groups .. --describe' listing.
>
> So I started a second... and it quickly displaced the first... which never
> came back.
>
> Started a third.. same effect. Second goes away never to return.. but now
> it's tries to rebalance for a while before I see the third by itself.
>
> Fourth and fifth - now it's gone off to rebalance (and is seemingly stuck
> there) and hasn't pulled any data for more than an hour.
>
>
>
> On Sun, Dec 11, 2016 at 2:27 PM, Matthias J. Sax 
> wrote:
>
>> No sure.
>>
>> How big is your state? On rebalance, state stores might move from one
>> machine to another. To recreate the store on the new machine the
>> underlying changelog topic must be read. This can take some time -- an
>> hour seems quite long though...
>>
>> To avoid long state recreation periods Kafka Streams support standby
>> task. Try to enable those via StreamsConfig: "num.standby.replicas"
>>
>> http://docs.confluent.io/current/streams/developer-guide.
>> html#optional-configuration-parameters
>>
>> Also check out this section of the docs:
>>
>> http://docs.confluent.io/3.1.1/streams/architecture.html#fault-tolerance
>>
>>
>> -Matthias
>>
>>
>> On 12/11/16 3:14 AM, Gerrit Jansen van Vuuren wrote:
>> > I don't know about speeding up rebalancing, and an hour seems to suggest
>> > something is wrong with zookeeper or you're whole setup maybe. if it
>> > becomes an unsolvable issue for you, you could try
>> > https://github.com/gerritjvv/kafka-fast which uses a different model
>> and
>> > doesn't need balancing or rebalancing.
>> >
>> > disclojure: "Im the library author".
>> >
>> >
>> >
>> > On 11 Dec 2016 11:56 a.m., "Jon Yeargers" 
>> wrote:
>> >
>> > Is there some way to 'help it along'? It's taking an hour or more from
>> when
>> > I start my app to actually seeing anything consumed.
>> >
>> > Plenty of CPU (and IOWait) during this time so I know it's doing
>> > _something_...
>> >
>>
>>
>


Re: rebalancing - how to speed it up?

2016-12-11 Thread Jon Yeargers
I changed 'num.standby.replicas' to '2'.

I started one instance and it immediately showed up in the
'kafka-consumer-groups .. --describe' listing.

So I started a second... and it quickly displaced the first... which never
came back.

Started a third.. same effect. Second goes away never to return.. but now
it's tries to rebalance for a while before I see the third by itself.

Fourth and fifth - now it's gone off to rebalance (and is seemingly stuck
there) and hasn't pulled any data for more than an hour.



On Sun, Dec 11, 2016 at 2:27 PM, Matthias J. Sax 
wrote:

> No sure.
>
> How big is your state? On rebalance, state stores might move from one
> machine to another. To recreate the store on the new machine the
> underlying changelog topic must be read. This can take some time -- an
> hour seems quite long though...
>
> To avoid long state recreation periods Kafka Streams support standby
> task. Try to enable those via StreamsConfig: "num.standby.replicas"
>
> http://docs.confluent.io/current/streams/developer-guide.html#optional-
> configuration-parameters
>
> Also check out this section of the docs:
>
> http://docs.confluent.io/3.1.1/streams/architecture.html#fault-tolerance
>
>
> -Matthias
>
>
> On 12/11/16 3:14 AM, Gerrit Jansen van Vuuren wrote:
> > I don't know about speeding up rebalancing, and an hour seems to suggest
> > something is wrong with zookeeper or you're whole setup maybe. if it
> > becomes an unsolvable issue for you, you could try
> > https://github.com/gerritjvv/kafka-fast which uses a different model and
> > doesn't need balancing or rebalancing.
> >
> > disclojure: "Im the library author".
> >
> >
> >
> > On 11 Dec 2016 11:56 a.m., "Jon Yeargers" 
> wrote:
> >
> > Is there some way to 'help it along'? It's taking an hour or more from
> when
> > I start my app to actually seeing anything consumed.
> >
> > Plenty of CPU (and IOWait) during this time so I know it's doing
> > _something_...
> >
>
>


Re: How does 'TimeWindows.of().until()' work?

2016-12-11 Thread Jon Yeargers
I've read this and still have more questions than answers. If my data skips
about (timewise) what determines when a given window will start / stop
accepting new data? What if Im reading data from some time ago?

On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax 
wrote:

> Please have a look here:
>
> http://docs.confluent.io/current/streams/developer-
> guide.html#windowing-a-stream
>
> If you have further question, just follow up :)
>
>
> -Matthias
>
>
> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> > Ive added the 'until()' clause to some aggregation steps and it's working
> > wonders for keeping the size of the state store in useful boundaries...
> But
> > Im not 100% clear on how it works.
> >
> > What is implied by the '.until()' clause? What determines when to stop
> > receiving further data - is it clock time (since the window was created)?
> > It seems problematic for it to refer to EventTime as this may bounce all
> > over the place. For non-overlapping windows a given record can only fall
> > into a single aggregation period - so when would a value get discarded?
> >
> > Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> 1000L).until(10 *
> > 1000L))'  - but what is this accomplishing?
> >
>
>


Re: rocksdb error(s)

2016-12-11 Thread Jon Yeargers
I moved the state folder to a separate drive and linked out to it.

I'll try your suggestion and point directly.

On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax 
wrote:

> I am not sure, but this might be related with your state directory.
>
> You use default directory that is located in /tmp -- could it be, that
> /tmp gets clean up and thus you loose files/directories?
>
> Try to reconfigure your state directory via StreamsConfig:
> http://docs.confluent.io/current/streams/developer-guide.html#optional-
> configuration-parameters
>
>
> -Matthias
>
> On 12/11/16 1:28 AM, Jon Yeargers wrote:
> > Seeing this appearing somewhat frequently -
> >
> > org.apache.kafka.streams.errors.ProcessorStateException: Error opening
> > store minute_agg_stream-201612100812 at location
> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_
> agg_stream-201612100812
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> openDB(RocksDBStore.java:196)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> openDB(RocksDBStore.java:158)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.
> openDB(RocksDBWindowStore.java:72)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> getOrCreateSegment(RocksDBWindowStore.java:402)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(
> RocksDBWindowStore.java:333)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(
> RocksDBWindowStore.java:51)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(
> RocksDBWindowStore.java:212)
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> restoreActiveState(ProcessorStateManager.java:235)
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:198)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:206)
> >
> > at
> > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> MeteredWindowStore.java:66)
> >
> > at
> > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> CachingWindowStore.java:64)
> >
> > at
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:81)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:120)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:633)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:660)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$100(
> StreamThread.java:69)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:124)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:228)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:313)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:277)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:259)
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:407)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > Caused by: org.rocksdb.RocksDBException: IO error: lock
> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_
> agg_stream-201612100812/LOCK:
> > No locks available
> >
> > at org.rocksdb.RocksDB.open(Native Method)
> >
> > at org.rocksdb.RocksDB.open(RocksDB.java:184)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> openDB(RocksDBStore.java:189)
> >
> > ... 26 common frames omitted
> >
>
>


Re: Another odd error

2016-12-11 Thread Jon Yeargers
I get this one quite a bit. It kills my app after a short time of running.
Driving me nuts.

On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax 
wrote:

> Not sure about this one.
>
> Can you describe what you do exactly? Can you reproduce the issue? We
> definitely want to investigate this.
>
> -Matthias
>
> On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > (Am reporting these as have moved to 0.10.1.0-cp2)
> >
> > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > MinuteAgg failed on partition assignment
> >
> > java.lang.IllegalStateException: task [1_9] Log end offset should not
> > change while restoring
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> restoreActiveState(ProcessorStateManager.java:245)
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:198)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:206)
> >
> > at
> > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> MeteredWindowStore.java:66)
> >
> > at
> > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> CachingWindowStore.java:64)
> >
> > at
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:81)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:120)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:633)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:660)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$100(
> StreamThread.java:69)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:124)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:228)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:313)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:277)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:259)
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:407)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
>
>


Re: odd error message

2016-12-11 Thread Jon Yeargers
Yes- but not 100% repro. I seem to have several issues with start /
rebalance

On Sun, Dec 11, 2016 at 2:16 PM, Matthias J. Sax 
wrote:

> Hi,
>
> this might be a recently discovered bug. Does it happen when you
> stop/restart your application?
>
>
> -Matthias
>
> On 12/10/16 1:42 PM, Jon Yeargers wrote:
> > This came up a few times today:
> >
> > 2016-12-10 18:45:52,637 [StreamThread-1] ERROR
> > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> Failed to
> > create an active task %s:
> >
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0]
> Error
> > while creating the state manager
> >
> > at
> > org.apache.kafka.streams.processor.internals.AbstractTask.(
> AbstractTask.java:72)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:90)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:633)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:660)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$100(
> StreamThread.java:69)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:124)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:228)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:313)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:277)
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:259)
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:407)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > Caused by: java.io.IOException: task [0_0] Failed to lock the state
> > directory: /mnt/extra/space/MinuteAgg/0_0
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.(ProcessorStateManager.java:101)
> >
> > at
> > org.apache.kafka.streams.processor.internals.AbstractTask.(
> AbstractTask.java:69)
> >
> > ... 13 common frames omitted
> >
>
>


rebalancing - how to speed it up?

2016-12-11 Thread Jon Yeargers
Is there some way to 'help it along'? It's taking an hour or more from when
I start my app to actually seeing anything consumed.

Plenty of CPU (and IOWait) during this time so I know it's doing
_something_...


rocksdb error(s)

2016-12-11 Thread Jon Yeargers
Seeing this appearing somewhat frequently -

org.apache.kafka.streams.errors.ProcessorStateException: Error opening
store minute_agg_stream-201612100812 at location
/tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_agg_stream-201612100812

at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:196)

at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:158)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:402)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:333)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:51)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:212)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:235)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Caused by: org.rocksdb.RocksDBException: IO error: lock
/tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_agg_stream-201612100812/LOCK:
No locks available

at org.rocksdb.RocksDB.open(Native Method)

at org.rocksdb.RocksDB.open(RocksDB.java:184)

at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:189)

... 26 common frames omitted


How does 'TimeWindows.of().until()' work?

2016-12-10 Thread Jon Yeargers
Ive added the 'until()' clause to some aggregation steps and it's working
wonders for keeping the size of the state store in useful boundaries... But
Im not 100% clear on how it works.

What is implied by the '.until()' clause? What determines when to stop
receiving further data - is it clock time (since the window was created)?
It seems problematic for it to refer to EventTime as this may bounce all
over the place. For non-overlapping windows a given record can only fall
into a single aggregation period - so when would a value get discarded?

Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * 1000L).until(10 *
1000L))'  - but what is this accomplishing?


Another odd error

2016-12-10 Thread Jon Yeargers
(Am reporting these as have moved to 0.10.1.0-cp2)

ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
MinuteAgg failed on partition assignment

java.lang.IllegalStateException: task [1_9] Log end offset should not
change while restoring

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)


odd error message

2016-12-10 Thread Jon Yeargers
This came up a few times today:

2016-12-10 18:45:52,637 [StreamThread-1] ERROR
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Failed to
create an active task %s:

org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Error
while creating the state manager

at
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:72)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:90)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Caused by: java.io.IOException: task [0_0] Failed to lock the state
directory: /mnt/extra/space/MinuteAgg/0_0

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:101)

at
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:69)

... 13 common frames omitted


'swap' space for KStream app - limitations?

2016-12-10 Thread Jon Yeargers
Are there any? My app ran for a few hours and filled a 100G partition (on 5
machines).

Any settings to keep this growth in check? Perhaps to estimate how much
space it's going to need?


checking consumer lag on KStreams app?

2016-12-09 Thread Jon Yeargers
How would this be done?


Re: controlling memory growth when aggregating

2016-12-09 Thread Jon Yeargers
I updated my consumer to that build. The memory issue seems to have abated.
TY!

Have started seeing this exception semi-regularly though:

ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
MinuteAgg failed on partition assignment

java.lang.IllegalStateException: task [1_4] Log end offset should not
change while restoring

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

On Fri, Dec 9, 2016 at 4:29 PM, Jon Yeargers 
wrote:

> Perhaps that's the problem. Yes - I'm still using 0.10.1.0.
>
> Does this involve a broker update?
>
> On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy  wrote:
>
>> Hi Jon,
>>
>> Are you using 0.10.1? There is a resource leak to do with the Window
>> Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
>> released as 0.10.1.1)
>> and it is also fixed in the confluent fork.
>>
>> You can get the confluent version by using the following:
>>
>> 
>> 
>> confluent
>> http://packages.confluent.io/maven/
>> 
>>
>> 
>> org.apache.kafka
>> kafka-streams
>> 0.10.1.0-cp2
>> org.apache.kafka
>> kafka-clients
>> 0.10.1.0-cp2
>>
>>
>> On Thu, 8 Dec 2016 at 23:37 Jon Yeargers 
>> wrote:
>>
>> I working with JSON data that has an array member. Im aggregating values
>> into this using minute long windows.
>>
>> I ran the app for ~10 minutes and watched it consume 40% of the memory on
>> a
>> box with 32G. It was still growing when I stopped it. At this point it had
>> created ~800 values each of which was < 1Mb in size (owing to the
>> limitations on message size set at the broker). (I wrote all the values
>> into Redis so I could count them and check the aggregation).
>>
>> 1. Why is it consuming so much memory?
>> 2. Is there a strategy for controlling this growth?
>>
>> I get that it's keeping every window open in case a new value shows up.
>> Maybe some way to relax this using event time vs clock time?
>>
>
>


Re: controlling memory growth when aggregating

2016-12-09 Thread Jon Yeargers
Perhaps that's the problem. Yes - I'm still using 0.10.1.0.

Does this involve a broker update?

On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy  wrote:

> Hi Jon,
>
> Are you using 0.10.1? There is a resource leak to do with the Window
> Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
> released as 0.10.1.1)
> and it is also fixed in the confluent fork.
>
> You can get the confluent version by using the following:
>
> 
> 
> confluent
> http://packages.confluent.io/maven/
> 
>
> 
> org.apache.kafka
> kafka-streams
> 0.10.1.0-cp2
> org.apache.kafka
>     kafka-clients
> 0.10.1.0-cp2
>
>
> On Thu, 8 Dec 2016 at 23:37 Jon Yeargers  wrote:
>
> I working with JSON data that has an array member. Im aggregating values
> into this using minute long windows.
>
> I ran the app for ~10 minutes and watched it consume 40% of the memory on a
> box with 32G. It was still growing when I stopped it. At this point it had
> created ~800 values each of which was < 1Mb in size (owing to the
> limitations on message size set at the broker). (I wrote all the values
> into Redis so I could count them and check the aggregation).
>
> 1. Why is it consuming so much memory?
> 2. Is there a strategy for controlling this growth?
>
> I get that it's keeping every window open in case a new value shows up.
> Maybe some way to relax this using event time vs clock time?
>


  1   2   >