Re: org.apache.kafka.common.errors.TimeoutException

2017-03-27 Thread R Krishna
Are you able to publish any messages at all? If it is one off, then it is
possible that the broker is busy and the client busy that it could not
publish that batch of messages in that partition 0 within 1732 ms in which
case you should increase the message timeouts and retries.
Search the timeout exception in user group for many other reasons/solutions.

On Mon, Mar 20, 2017 at 10:49 AM, Mina Aslani  wrote:

> Hi,
>
> I get ERROR Error when sending message to topic my-topic with key: null,
> value: ... bytes with error: (org.apache.kafka.clients.producer.internals.
> ErrorLoggingCallback)
>
> org.apache.kafka.common.errors.TimeoutException: Expiring 11 record(s) for
> my-topic-0: 1732 ms has passed since last append
>
> Any idea?
>
> Best regards,
> Mina
>



-- 
Radha Krishna, Proddaturi
253-234-5657


Re: 3 machines 12 threads all fail within 2 hours of starting the streams application

2017-03-27 Thread Sachin Mittal
 - single threaded multiple instances
This option we could not try. However what we observed that running
multiple instances on same machine with single thread would still create
multiple rocksdb instances and somehow the VM is not able to handle many
rocksdb instances running. Here bottleneck used to be rocks db.
However this was with earlier config of rocksdb.

- single thread single instance but multiple partitions
Here again we would had to restrict the partition so as to limit the
rocksdb instances.

However given that we need to partition our input source by atleast 12 to
take care of the peak time load either of two options were not feasible.

The recent state is as followed

1. We have picked up the latest rocksdb config from trunk.
2. Fixed the deadlock issue
3. Increased the RAM on machine.

This worked fine but we got into other issues which I posted in this thread.
4. Now we have added ProducerConfig.RETRIES_CONFIG = Integer.MAX_VALUE)
This setting is working fine. It has been a day now and the application's
all 12 threads are up and running fine. Since this is month end (and also
financial year end) and we are implementing this for a banking application,
we are getting somewhat peak load and it is working fine under pressure.
Also we are not experiencing any lag like we do for single instance so our
number of partition logic is also OK.

To conclude
1. rocks db is indeed a bottleneck here, but with latest settings and
(perhaps increasing the RAM) it is working fine. Again so far we are not
able to figure out how much of this is hardware issue and how much is a
software issue.

2. the streams application is not resilient. It kills the thread in cases
where it should handle the exception or even it it wants to throw the
exception all the way up, it should give devs flexibility to spawn a new
thread is some thread dies. Example are like
Log end offset of chagelog should not change while restoring
or Expiring 1 record(s) for changelog
or org.rocksdb.RocksDBException: ~

Lets hope with the PR https://github.com/apache/kafka/pull/2719 much of
such errors are resolved.

Thanks
Sachin



On Tue, Mar 28, 2017 at 1:02 AM, Matthias J. Sax 
wrote:

> Sachin,
>
> about this statement:
>
> >> Also note that when an identical streams application with single thread
> on
> >> a single instance is pulling data from some other non partitioned
> identical
> >> topic, the application never fails.
>
> What about some "combinations":
>  - single threaded multiple instances
>  - single thread single instance but multiple partitions
> etc.
>
> it would be helpful to understand, what scenario works and what not.
> Right now you go from
> single-threaded-sinlge-instance-with-no-partitioning to
> multi-threaded-multiple-instances-and-partitioned -- that's a big step
> to reason about the situation.
>
>
> -Matthias
>
>
> On 3/25/17 11:14 AM, Sachin Mittal wrote:
> > Hi,
> > The broker is a three machine cluster. The replication factor for input
> and
> > also internal topics is 3.
> > Brokers don't seem to fail. I always see their instances running.
> >
> > Also note that when an identical streams application with single thread
> on
> > a single instance is pulling data from some other non partitioned
> identical
> > topic, the application never fails. Note there too replication factor is
> 3
> > for input and internal topics.
> >
> > Please let us know if you have something for other errors. Also what ways
> > we can make the steams resilient. I do feel we need hooks to start new
> > stream threads just in case some thread shuts down due to unhandled
> > exception, or streams application itself doing a better job in handling
> > such and not shutting down the threads.
> >
> > Thanks
> > Sachin
> >
> >
> > On Sat, Mar 25, 2017 at 11:03 PM, Eno Thereska 
> > wrote:
> >
> >> Hi Sachin,
> >>
> >> See my previous email on the NotLeaderForPartitionException error.
> >>
> >> What is your Kafka configuration, how many brokers are you using? Also
> >> could you share the replication level (if different from 1) of your
> streams
> >> topics? Are there brokers failing while Streams is running?
> >>
> >> Thanks
> >> Eno
> >>
> >> On 25/03/2017, 11:00, "Sachin Mittal"  wrote:
> >>
> >> Hi All,
> >> I am revisiting the ongoing issue of getting a multi instance multi
> >> threaded kafka streams cluster to work.
> >>
> >> Scenario is that we have a 12 partition source topic. (note our
> server
> >> cluster replication factor is 3).
> >> We have a 3 machines client cluster with one instance on each. Each
> >> instances uses 4 thread.
> >> Streams version is 0.10.2 with latest deadlock fix and rocks db
> >> optimization from trunk.
> >>
> >> We also have an identical single partition topic and another single
> >> threaded instance doing identical processing as the above one. This
> >> uses
> >> version 0.10.1.1
> >> This streams application never goes down.
> >>
> >> The above applic

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Well, even with 4-5x better performance thanks to the session window fix, I 
expect to get ~10x better performance if I throw 10x more nodes at the problem. 
That won’t be the case due to task assignment unfortunately. I may end up with 
say 5-6 nodes with aggregation assigned to them and 4-5 nodes sitting there 
doing nothing. So it is a problem.

Ara.

On Mar 27, 2017, at 4:15 PM, Matthias J. Sax 
mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 4:15:07 PM PDT
To: users@kafka.apache.org
Reply-To: mailto:users@kafka.apache.org>>


Great!

So overall, the issue is not related to task assignment. Also the
description below, does not indicate that different task assignment
would change anything.


-Matthias

On 3/27/17 3:08 PM, Ara Ebrahimi wrote:
Let me clarify, cause I think we’re using different terminologies:

- message key is phone number, reversed
- all call records for a phone number land on the same partition
- then we apply a session window on them and aggregate+reduce
- so we end up with a group of records for a phone number. This group is 
literally an avro object with an array of records in it for the session.
- since records arrive chronologically and since we have a session window, then 
all call records for a phone number end up in the same partition and session 
(intended behavior). We can easily have many such phone call record groups with 
100s of call records in them. The aggregate object (the avro object with array 
of records in it) can get updated 100s of times for the same phone number in 
the course of an hour or so.
- we process billions of such call records a day
- we can’t expect our clients to install massive clusters of 10s or 100s of 
nodes. We do need to make sure this processing is as efficient as possible. The 
session window bug was killing us. Much better with the fix Damian provided!

Ara.

On Mar 27, 2017, at 2:41 PM, Matthias J. Sax 
mailto:matth...@confluent.io>>
 wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" 
mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 2:41:06 PM PDT
To: 
users@kafka.apache.org
Reply-To: 
mailto:users@kafka.apache.org>>


Ara,

I assume your performance issue is most likely related to the fix Damian
pointed out already.

Couple of follow up comments:

critical part of our pipeline involves grouping relevant records together

Can you explain this a little better? The abstraction of a task does
group data together already. Furthermore, if you get multiple tasks,
those are independent units with regard to grouping as consecutive tasks
are "connected" via repartitioning steps. Thus, even if we apply the
task assignment as you ask for, I am not sure if this would change
anything? Maybe you can give a small data example what
behavior/data-co-location you need and what Streams provides for you.

And for hot keys this can lead to sometimes 100s of records to get grouped 
together

This is independent of task placement -- it a partitioning issue. If you
want to work on that, you can provide a custom `Partioner` for the used
Kafka producers (note, your external Producer writing to your Streams
input topic might already generate "hot" keys, so you might need to use
a custom partitioner there, too)

Also "100s of records" does not sound much to me. Streams can process
multiple hundredths of thousandth records per thread. That is the
reason, why I think that the fix Damian pointed out will most likely fix
your problem.



-Matthias


On 3/27/17 1:56 PM, Ara Ebrahimi wrote:
Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a critical part 
of our pipeline involves grouping relevant records together (that’s what the 
aggregate function in the topology is for). And for hot keys this can lead to 
sometimes 100s of records to get grouped together. Even worse, these reco

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-27 Thread Matthias J. Sax
Hi,

I would like to trigger this discussion again. It seems that the naming
question is rather subjective and both main alternatives (w/ or w/o the
word "Topology" in the name) have pros/cons.

If you have any further thought, please share it. At the moment I still
propose `StreamsBuilder` in the KIP.

I also want do point out, that the VOTE thread was already started. So
if you like the current KIP, please cast your vote there.


Thanks a lot!


-Matthias


On 3/23/17 3:38 PM, Matthias J. Sax wrote:
> Jay,
> 
> about the naming schema:
> 
>>>1. "kstreams" - the DSL
>>>2. "processor api" - the lower level callback/topology api
>>>3. KStream/KTable - entities in the kstreams dsl
>>>4. "Kafka Streams" - General name for stream processing stuff in Kafka,
>>>including both kstreams and the processor API plus the underlying
>>>implementation.
> 
> It think this terminology has some issues... To me, `kstreams` was
> always not more than an abbreviation for `Kafka Streams` -- thus (1) and
> (4) kinda collide here. Following questions on the mailing list etc I
> often see people using kstreams or kstream exactly a abbr. for "Kafka
> Streams"
> 
>> I think referring to the dsl as "kstreams" is cute and pneumonic and not
>> particularly confusing.
> 
> I disagree here. It's a very subtle difference between `kstreams` and
> `KStream` -- just singular/plural, thus (1) and (3) also "collide" --
> it's just too close to each other.
> 
> Thus, I really think it's a good idea to get a new name for the DSL to
> get a better separation of the 4 concepts.
> 
> Furthermore, we use the term "Streams API". Thus, I think
> `StreamsBuilder` (or `StreamsTopologyBuilder`) are both very good names.
> 
> 
> Thus, I prefer to keep the KIP as is (suggesting `StreamsBuilder`).
> 
> I will start a VOTE thread. Of course, we can still discuss the naming
> issue. :)
> 
> 
> 
> -Matthias
> 
> 
> On 3/22/17 8:53 PM, Jay Kreps wrote:
>> I don't feel strongly on this, so I'm happy with whatever everyone else
>> wants.
>>
>> Michael, I'm not arguing that people don't need to understand topologies, I
>> just think it is like rocks db, you need to understand it when
>> debugging/operating but not in the initial coding since the metaphor we're
>> providing at this layer isn't a topology of processors but rather something
>> like the collections api. Anyhow it won't hurt people to have it there.
>>
>> For the original KStreamBuilder thing, I think that came from the naming we
>> discussed originally:
>>
>>1. "kstreams" - the DSL
>>2. "processor api" - the lower level callback/topology api
>>3. KStream/KTable - entities in the kstreams dsl
>>4. "Kafka Streams" - General name for stream processing stuff in Kafka,
>>including both kstreams and the processor API plus the underlying
>>implementation.
>>
>> I think referring to the dsl as "kstreams" is cute and pneumonic and not
>> particularly confusing. Just like referring to the "java collections
>> library" isn't confusing even though it contains the Iterator interface
>> which is not actually itself a collection.
>>
>> So I think KStreamBuilder should technically have been KstreamsBuilder and
>> is intended not to be a builder of a KStream but rather the builder for the
>> kstreams DSL. Okay, yes, that *is* slightly confusing. :-)
>>
>> -Jay
>>
>> On Wed, Mar 22, 2017 at 11:25 AM, Guozhang Wang  wrote:
>>
>>> Regarding the naming of `StreamsTopologyBuilder` v.s. `StreamsBuilder` that
>>> are going to be used in DSL, I agree both has their arguments:
>>>
>>> 1. On one side, people using the DSL layer probably do not need to be aware
>>> (or rather, "learn about") of the "topology" concept, although this concept
>>> is a publicly exposed one in Kafka Streams.
>>>
>>> 2. On the other side, StreamsBuilder#build() returning a Topology object
>>> sounds a little weird, at least to me (admittedly subjective matter).
>>>
>>>
>>> Since the second bullet point seems to be more "subjective" and many people
>>> are not worried about it, I'm OK to go with the other option.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Wed, Mar 22, 2017 at 8:58 AM, Michael Noll 
>>> wrote:
>>>
 Forwarding to kafka-user.


 -- Forwarded message --
 From: Michael Noll 
 Date: Wed, Mar 22, 2017 at 8:48 AM
 Subject: Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API
 To: d...@kafka.apache.org


 Matthias,

> @Michael:
>
> You seemed to agree with Jay about not exposing the `Topology` concept
> in our main entry class (ie, current KStreamBuilder), thus, I
> interpreted that you do not want `Topology` in the name either (I am a
> little surprised by your last response, that goes the opposite
 direction).

 Oh, sorry for not being clear.

 What I wanted to say in my earlier email was the following:  Yes, I do
 agree with most of Jay's reasoning, notably about carefully deciding how

Upgrade ssl.enabled.protocols to TLSv1.2

2017-03-27 Thread Samuel Zhou
Hi,

In my previous server and consumer configuration, we have
set ssl.enabled.protocols=TLSv1 , but we want to
upgrade ssl.enabled.protocols=TLSv1.2 now. Since the default value of
ssl.enabled.protocols support 3 versions: v1, v1.1 and v1.2, what should I
do to force both broker and consumer to use TLSv1.2? Should I just set
ssl.enabled.protocols to TLSv1.2? Could I reuse the key store I generated
for TLSv1?

Thanks!
Samuel


Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Matthias J. Sax
Great!

So overall, the issue is not related to task assignment. Also the
description below, does not indicate that different task assignment
would change anything.


-Matthias

On 3/27/17 3:08 PM, Ara Ebrahimi wrote:
> Let me clarify, cause I think we’re using different terminologies:
> 
> - message key is phone number, reversed
> - all call records for a phone number land on the same partition
> - then we apply a session window on them and aggregate+reduce
> - so we end up with a group of records for a phone number. This group is 
> literally an avro object with an array of records in it for the session.
> - since records arrive chronologically and since we have a session window, 
> then all call records for a phone number end up in the same partition and 
> session (intended behavior). We can easily have many such phone call record 
> groups with 100s of call records in them. The aggregate object (the avro 
> object with array of records in it) can get updated 100s of times for the 
> same phone number in the course of an hour or so.
> - we process billions of such call records a day
> - we can’t expect our clients to install massive clusters of 10s or 100s of 
> nodes. We do need to make sure this processing is as efficient as possible. 
> The session window bug was killing us. Much better with the fix Damian 
> provided!
> 
> Ara.
> 
> On Mar 27, 2017, at 2:41 PM, Matthias J. Sax 
> mailto:matth...@confluent.io>> wrote:
> 
> 
> 
> 
> 
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> 
> 
> From: "Matthias J. Sax" mailto:matth...@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 2:41:06 PM PDT
> To: users@kafka.apache.org
> Reply-To: mailto:users@kafka.apache.org>>
> 
> 
> Ara,
> 
> I assume your performance issue is most likely related to the fix Damian
> pointed out already.
> 
> Couple of follow up comments:
> 
> critical part of our pipeline involves grouping relevant records together
> 
> Can you explain this a little better? The abstraction of a task does
> group data together already. Furthermore, if you get multiple tasks,
> those are independent units with regard to grouping as consecutive tasks
> are "connected" via repartitioning steps. Thus, even if we apply the
> task assignment as you ask for, I am not sure if this would change
> anything? Maybe you can give a small data example what
> behavior/data-co-location you need and what Streams provides for you.
> 
> And for hot keys this can lead to sometimes 100s of records to get grouped 
> together
> 
> This is independent of task placement -- it a partitioning issue. If you
> want to work on that, you can provide a custom `Partioner` for the used
> Kafka producers (note, your external Producer writing to your Streams
> input topic might already generate "hot" keys, so you might need to use
> a custom partitioner there, too)
> 
> Also "100s of records" does not sound much to me. Streams can process
> multiple hundredths of thousandth records per thread. That is the
> reason, why I think that the fix Damian pointed out will most likely fix
> your problem.
> 
> 
> 
> -Matthias
> 
> 
> On 3/27/17 1:56 PM, Ara Ebrahimi wrote:
> Thanks for the response Mathias!
> 
> The reason we want this exact task assignment to happen is that a critical 
> part of our pipeline involves grouping relevant records together (that’s what 
> the aggregate function in the topology is for). And for hot keys this can 
> lead to sometimes 100s of records to get grouped together. Even worse, these 
> records are session bound, we use session windows. Hence we see lots of 
> activity around the store backing the aggregate function and even though we 
> use SSD drives we’re not seeing the kind of performance we want to see. It 
> seems like the aggregate function leads to lots of updates to these hot keys 
> which lead to lots of rocksdb activity.
> 
> Now there are many ways to fix this problem:
> - just don’t aggregate, create an algorithm which is not reliant on 
> grouping/aggregating records. Not what we can do with our tight schedule 
> right now.
> - do grouping/aggregating but employ n instances and rely on uniform 
> distribution of these tasks. This is the easiest solution and what we 
> expected to work but didn’t work as you can tell from this thread. We threw 4 
> instances at it but only 2 got used.
> - tune rocksdb? I tried this actually but it didn’t really help us much, 
> aside from the fact that tuning rocksdb is very tricky.
> - use in-memory store instead? Unfortunately we have to use session windows 
> for this ag

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Let me clarify, cause I think we’re using different terminologies:

- message key is phone number, reversed
- all call records for a phone number land on the same partition
- then we apply a session window on them and aggregate+reduce
- so we end up with a group of records for a phone number. This group is 
literally an avro object with an array of records in it for the session.
- since records arrive chronologically and since we have a session window, then 
all call records for a phone number end up in the same partition and session 
(intended behavior). We can easily have many such phone call record groups with 
100s of call records in them. The aggregate object (the avro object with array 
of records in it) can get updated 100s of times for the same phone number in 
the course of an hour or so.
- we process billions of such call records a day
- we can’t expect our clients to install massive clusters of 10s or 100s of 
nodes. We do need to make sure this processing is as efficient as possible. The 
session window bug was killing us. Much better with the fix Damian provided!

Ara.

On Mar 27, 2017, at 2:41 PM, Matthias J. Sax 
mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 2:41:06 PM PDT
To: users@kafka.apache.org
Reply-To: mailto:users@kafka.apache.org>>


Ara,

I assume your performance issue is most likely related to the fix Damian
pointed out already.

Couple of follow up comments:

critical part of our pipeline involves grouping relevant records together

Can you explain this a little better? The abstraction of a task does
group data together already. Furthermore, if you get multiple tasks,
those are independent units with regard to grouping as consecutive tasks
are "connected" via repartitioning steps. Thus, even if we apply the
task assignment as you ask for, I am not sure if this would change
anything? Maybe you can give a small data example what
behavior/data-co-location you need and what Streams provides for you.

And for hot keys this can lead to sometimes 100s of records to get grouped 
together

This is independent of task placement -- it a partitioning issue. If you
want to work on that, you can provide a custom `Partioner` for the used
Kafka producers (note, your external Producer writing to your Streams
input topic might already generate "hot" keys, so you might need to use
a custom partitioner there, too)

Also "100s of records" does not sound much to me. Streams can process
multiple hundredths of thousandth records per thread. That is the
reason, why I think that the fix Damian pointed out will most likely fix
your problem.



-Matthias


On 3/27/17 1:56 PM, Ara Ebrahimi wrote:
Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a critical part 
of our pipeline involves grouping relevant records together (that’s what the 
aggregate function in the topology is for). And for hot keys this can lead to 
sometimes 100s of records to get grouped together. Even worse, these records 
are session bound, we use session windows. Hence we see lots of activity around 
the store backing the aggregate function and even though we use SSD drives 
we’re not seeing the kind of performance we want to see. It seems like the 
aggregate function leads to lots of updates to these hot keys which lead to 
lots of rocksdb activity.

Now there are many ways to fix this problem:
- just don’t aggregate, create an algorithm which is not reliant on 
grouping/aggregating records. Not what we can do with our tight schedule right 
now.
- do grouping/aggregating but employ n instances and rely on uniform 
distribution of these tasks. This is the easiest solution and what we expected 
to work but didn’t work as you can tell from this thread. We threw 4 instances 
at it but only 2 got used.
- tune rocksdb? I tried this actually but it didn’t really help us much, aside 
from the fact that tuning rocksdb is very tricky.
- use in-memory store instead? Unfortunately we have to use session windows for 
this aggregate function and apparently there’s no in-memory session store impl? 
I tried to create one but soon realized it’s too much work :) I looked at 
default PartitionAssigner code too, but that ain’t trivial either.

So I’m a bit hopeless :(

Ara.

On Mar 27, 2017, at 1:35 PM, Matthias J. Sax 
mailto:matth...@confluent.io>>
 wrote:






This message is for the des

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Holy s...!

This lead to ~4-5x better performance. I’m gonna try this with more nodes and 
if performance improves almost linearly then we are good for now.

Thanks!
Ara.

> On Mar 27, 2017, at 2:10 PM, Damian Guy  wrote:
>
> Hi Ara,
>
> There is a performance issue in the 0.10.2 release of session windows. It
> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> You can work around this on 0.10.2 by calling the aggregate(..), reduce(..)
> etc methods and supplying StateStoreSupplier with caching
> disabled, i.e, by doing something like:
>
> final StateStoreSupplier sessionStore =
> Stores.create(*"session-store-name"*)
>.withKeys(Serdes.String())
>.withValues(Serdes.String())
>.persistent()
>.sessionWindowed(TimeUnit.MINUTES.toMillis(7))
>.build();
>
>
> The fix has also been cherry-picked to the 0.10.2 branch, so you could
> build from source and not have to create the StateStoreSupplier.
>
> Thanks,
> Damian
>
> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi 
> wrote:
>
> Thanks for the response Mathias!
>
> The reason we want this exact task assignment to happen is that a critical
> part of our pipeline involves grouping relevant records together (that’s
> what the aggregate function in the topology is for). And for hot keys this
> can lead to sometimes 100s of records to get grouped together. Even worse,
> these records are session bound, we use session windows. Hence we see lots
> of activity around the store backing the aggregate function and even though
> we use SSD drives we’re not seeing the kind of performance we want to see.
> It seems like the aggregate function leads to lots of updates to these hot
> keys which lead to lots of rocksdb activity.
>
> Now there are many ways to fix this problem:
> - just don’t aggregate, create an algorithm which is not reliant on
> grouping/aggregating records. Not what we can do with our tight schedule
> right now.
> - do grouping/aggregating but employ n instances and rely on uniform
> distribution of these tasks. This is the easiest solution and what we
> expected to work but didn’t work as you can tell from this thread. We threw
> 4 instances at it but only 2 got used.
> - tune rocksdb? I tried this actually but it didn’t really help us much,
> aside from the fact that tuning rocksdb is very tricky.
> - use in-memory store instead? Unfortunately we have to use session windows
> for this aggregate function and apparently there’s no in-memory session
> store impl? I tried to create one but soon realized it’s too much work :) I
> looked at default PartitionAssigner code too, but that ain’t trivial either.
>
> So I’m a bit hopeless :(
>
> Ara.
>
> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax  matth...@confluent.io>> wrote:
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>
> From: "Matthias J. Sax" mailto:matth...@confluent.io
>>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 1:35:30 PM PDT
> To: users@kafka.apache.org
> Reply-To: mailto:users@kafka.apache.org>>
>
>
> Ara,
>
> thanks for the detailed information.
>
> If I parse this correctly, both instances run the same number of tasks
> (12 each). That is all Streams promises.
>
> To come back to your initial question:
>
> Is there a way to tell kafka streams to uniformly assign partitions across
> instances? If I have n kafka streams instances running, I want each to
> handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> logic. Just dumb 1/n assignment.
>
> That is exactly what you get: each of you two instances get 24/2 = 12
> tasks assigned. That is dump 1/n assignment, isn't it? So my original
> response was correct.
>
> However, I now understand better what you are actually meaning by your
> question. Note that Streams does not distinguish "type" of tasks -- it
> only sees 24 tasks and assigns those in a balanced way.
>
> Thus, currently there is no easy way to get the assignment you want to
> have, except, you implement you own `PartitionAssignor`.
>
> This is the current implementation for 0.10.2
> https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
>
> You can, if you wish write your own assignor and set it via
> StreamsConfig. However, be aware that this might be tricky to get right
> and also might have runtime implications with regard to rebalancing and
> state store recovery. We recently improve the current implementation to
> avoid costly task movements:
> https://issues.apache.org/jira/browse/KAFKA-4677
>
> Thus, I would not r

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Matthias J. Sax
Ara,

I assume your performance issue is most likely related to the fix Damian
pointed out already.

Couple of follow up comments:

> critical part of our pipeline involves grouping relevant records together

Can you explain this a little better? The abstraction of a task does
group data together already. Furthermore, if you get multiple tasks,
those are independent units with regard to grouping as consecutive tasks
are "connected" via repartitioning steps. Thus, even if we apply the
task assignment as you ask for, I am not sure if this would change
anything? Maybe you can give a small data example what
behavior/data-co-location you need and what Streams provides for you.

> And for hot keys this can lead to sometimes 100s of records to get grouped 
> together

This is independent of task placement -- it a partitioning issue. If you
want to work on that, you can provide a custom `Partioner` for the used
Kafka producers (note, your external Producer writing to your Streams
input topic might already generate "hot" keys, so you might need to use
a custom partitioner there, too)

Also "100s of records" does not sound much to me. Streams can process
multiple hundredths of thousandth records per thread. That is the
reason, why I think that the fix Damian pointed out will most likely fix
your problem.



-Matthias


On 3/27/17 1:56 PM, Ara Ebrahimi wrote:
> Thanks for the response Mathias!
> 
> The reason we want this exact task assignment to happen is that a critical 
> part of our pipeline involves grouping relevant records together (that’s what 
> the aggregate function in the topology is for). And for hot keys this can 
> lead to sometimes 100s of records to get grouped together. Even worse, these 
> records are session bound, we use session windows. Hence we see lots of 
> activity around the store backing the aggregate function and even though we 
> use SSD drives we’re not seeing the kind of performance we want to see. It 
> seems like the aggregate function leads to lots of updates to these hot keys 
> which lead to lots of rocksdb activity.
> 
> Now there are many ways to fix this problem:
> - just don’t aggregate, create an algorithm which is not reliant on 
> grouping/aggregating records. Not what we can do with our tight schedule 
> right now.
> - do grouping/aggregating but employ n instances and rely on uniform 
> distribution of these tasks. This is the easiest solution and what we 
> expected to work but didn’t work as you can tell from this thread. We threw 4 
> instances at it but only 2 got used.
> - tune rocksdb? I tried this actually but it didn’t really help us much, 
> aside from the fact that tuning rocksdb is very tricky.
> - use in-memory store instead? Unfortunately we have to use session windows 
> for this aggregate function and apparently there’s no in-memory session store 
> impl? I tried to create one but soon realized it’s too much work :) I looked 
> at default PartitionAssigner code too, but that ain’t trivial either.
> 
> So I’m a bit hopeless :(
> 
> Ara.
> 
> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax 
> mailto:matth...@confluent.io>> wrote:
> 
> 
> 
> 
> 
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> 
> 
> From: "Matthias J. Sax" mailto:matth...@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 1:35:30 PM PDT
> To: users@kafka.apache.org
> Reply-To: mailto:users@kafka.apache.org>>
> 
> 
> Ara,
> 
> thanks for the detailed information.
> 
> If I parse this correctly, both instances run the same number of tasks
> (12 each). That is all Streams promises.
> 
> To come back to your initial question:
> 
> Is there a way to tell kafka streams to uniformly assign partitions across 
> instances? If I have n kafka streams instances running, I want each to handle 
> EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just 
> dumb 1/n assignment.
> 
> That is exactly what you get: each of you two instances get 24/2 = 12
> tasks assigned. That is dump 1/n assignment, isn't it? So my original
> response was correct.
> 
> However, I now understand better what you are actually meaning by your
> question. Note that Streams does not distinguish "type" of tasks -- it
> only sees 24 tasks and assigns those in a balanced way.
> 
> Thus, currently there is no easy way to get the assignment you want to
> have, except, you implement you own `PartitionAssignor`.
> 
> This is the current implementation for 0.10.2
> https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitio

Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API

2017-03-27 Thread Florian Hussonnois
Hi Guozhang, Matthias,

It's a great idea to add sub topologies descriptions. This would help
developers to better understand topology concept.

I agree that is not really user-friendly to check if
`StreamsMetadata#streamThreads` is not returning null.

The method name localThreadsMetadata looks good. In addition, it's more
simple to build ThreadMetadata instances from the `StreamTask` class than
from `StreamPartitionAssignor` class.

I will work on modifications. As I understand, I have to add the property
subTopologyId property to the TaskMetadata class - Am I right ?

Thanks,

2017-03-26 0:25 GMT+01:00 Guozhang Wang :

> Re 1): this is a good point. May be we can move 
> `StreamsMetadata#streamThreads`
> as `KafkaStreams#localThreadsMetadata`?
>
> 3): this is a minor suggestion about function name of
> `assignedPartitions`, to `topicPartitions` to be consistent with
> `StreamsMetadata`?
>
>
> Guozhang
>
> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax 
> wrote:
>
>> Thanks for the progress on this KIP. I think we are on the right path!
>>
>> Couple of comments/questions:
>>
>> (1) Why do we not consider the "rejected alternative" to add the method
>> to KafkaStreams? The comment on #streamThreads() says:
>>
>> "Note this method will return null if called on {@link
>> StreamsMetadata} which represent a remote application."
>>
>> Thus, if we cannot get any remote metadata, it seems not straight
>> forward to not add it to KafkaStreams directly -- this would avoid
>> invalid calls and `null` return value in the first place.
>>
>> I like the idea about exposing sub-topologies.:
>>
>> (2a) I would recommend to rename `topicsGroupId` to `subTopologyId` :)
>>
>> (2b) We could add this to KIP-120 already. However, I would not just
>> link both via name, but leverage KIP-120 directly, and add a
>> "Subtopology" member to the TaskMetadata class.
>>
>>
>> Overall, I like the distinction of KIP-120 only exposing "static"
>> information that can be determined before the topology get's started,
>> while this KIP allow to access runtime information.
>>
>>
>>
>> -Matthias
>>
>>
>> On 3/22/17 12:42 PM, Guozhang Wang wrote:
>> > Thanks for the updated KIP, and sorry for the late replies!
>> >
>> > I think a little bit more about KIP-130, and I feel that if we are going
>> > to deprecate the `toString` function (it is not explicitly said in the
>> > KIP, so I'm not sure if you plan to still keep the
>> > `KafkaStreams#toString` as is or are going to replace it with the
>> > proposed APIs) with the proposed ones, it may be okay. More
>> > specifically, after both KIP-120 and KIP-130:
>> >
>> > 1. users can use `#describe` function to check the generated topology
>> > before calling `KafkaStreams#start`, which is static information.
>> > 2. users can use the `StreamsMetadata -> ThreadMetadata -> TaskMetadata`
>> > programmatically after called `KafkaStreams#start` to get the
>> > dynamically changeable information.
>> >
>> > One thing I'm still not sure though, is that in `TaskMetadata` we only
>> > have the TaskId and assigned partitions, whereas in
>> > "TopologyDescription" introduced in KIP-120, it will simply describe the
>> > whole topology possibly composed of multiple sub-topologies. So it is
>> > hard for users to tell which sub-topology is executed under which task
>> > on-the-fly.
>> >
>> > Hence I'm thinking if we can expose the "sub-topology-id" (named as
>> > topicsGroupId internally) in TopologyDescription#Subtopology, and then
>> > from the task id which is essentially "sub-topology-id DASH
>> > partition-group-id" users can make the link, though it is still not that
>> > straight-forward.
>> >
>> > Thoughts?
>> >
>> > Guozhang
>> >
>> >
>> >
>> > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
>> > mailto:fhussonn...@gmail.com>> wrote:
>> >
>> > Thanks Guozhang for pointing me to the KIP-120.
>> >
>> > I've made some modifications to the KIP. I also proposed a new PR
>> > (there is
>> > still some tests to make).
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%
>> 3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>> > > 3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API>
>> >
>> > Exposing consumed offsets through JMX is sufficient for debugging
>> > purpose.
>> > But I think this could be part to another JIRA as there is no
>> impact to
>> > public API.
>> >
>> > Thanks
>> >
>> > 2017-03-10 22:35 GMT+01:00 Guozhang Wang > > >:
>>
>> >
>> > > Hello Florian,
>> > >
>> > > As for programmatically discover monitoring data by piping metrics
>> > into a
>> > > dedicated topic. I think you can actually use a
>> > KafkaMetricsReporter which
>> > > pipes the polled metric values into a pre-defined topic (note that
>> > in Kafka
>> > > the MetricsReporter is simply an interface and users can build
>> >  

KafkaProducer overriding security.protocol config value and unable to connect to Broker

2017-03-27 Thread Srikrishna Alla
Hi everyone,

I am facing an issue when writing to Kafka broker. I am instantiating a
KafkaProducer by passing it configuration properties, on a secured Kafka
Broker with SASL_PLAINTEXT security.protocol. I can see that I am passing
the right security.protocol when instantiating the Producer, but log shows
its been set to Null for some reason. Has anyone seen this kind of error
before. My client is Kafka 9 and Cluster is Kafka 10. Please see the error
below. I am printing the properties between KAFKA PROPS and I see I am
passing security.protocol right, but below it shows it is null.

45806 [15:37:05:224] [Loader-1] INFO  c.a.b.d.etl.monitor.AlertProducer [,,
]   - =KAFKA PROPS==
45806 [15:37:05:224] [Loader-1] INFO  c.a.b.d.etl.monitor.AlertProducer [,,
]   - {*security.protocol=SASL_PLAINTEXT,* bootstrap.servers=blp**com:6667,
value.serializer=org.apache
.kafka.common.serialization.StringSerializer, buffer.memory=33554432,
retries=0,
key.serializer=org.apache.kafka.common.serialization.StringSerializer,
linger.ms=0, batch.size=16384, acks=a
ll}
45806 [15:37:05:224] [Loader-1] INFO  c.a.b.d.etl.monitor.AlertProducer [,,
]   - =KAFKA PROPS==
45817 [15:37:05:235] [Loader-1] INFO  o.a.k.c.producer.ProducerConfig [,,
]   - ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 30
metadata.fetch.timeout.ms = 6
acks = all
batch.size = 16384
reconnect.backoff.ms = 10
bootstrap.servers = [blpd**.com:6667]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 3
key.serializer = class
org.apache.kafka.common.serialization.StringSerializer
retries = 0
max.request.size = 1048576
block.on.buffer.full = true
value.serializer = class
org.apache.kafka.common.serialization.StringSerializer
metrics.sample.window.ms = 3
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 0
client.id =

45847 [15:37:05:265] [Loader-1] WARN  o.a.k.c.producer.ProducerConfig [,,
]   - *The configuration security.protocol = null was supplied but isn't a
known config.*
45954 [15:37:05:372] [kafka-producer-network-thread | producer-1] WARN
o.a.kafka.common.network.Selector [,, ]   - Error in I/O with blpd**.com/
130.5.106.91
java.io.EOFException: null
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)

Thanks a lot for your help,
Sri


Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Damian Guy
Hi Ara,

There is a performance issue in the 0.10.2 release of session windows. It
is fixed with this PR: https://github.com/apache/kafka/pull/2645
You can work around this on 0.10.2 by calling the aggregate(..), reduce(..)
etc methods and supplying StateStoreSupplier with caching
disabled, i.e, by doing something like:

final StateStoreSupplier sessionStore =
Stores.create(*"session-store-name"*)
.withKeys(Serdes.String())
.withValues(Serdes.String())
.persistent()
.sessionWindowed(TimeUnit.MINUTES.toMillis(7))
.build();


The fix has also been cherry-picked to the 0.10.2 branch, so you could
build from source and not have to create the StateStoreSupplier.

Thanks,
Damian

On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi 
wrote:

Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a critical
part of our pipeline involves grouping relevant records together (that’s
what the aggregate function in the topology is for). And for hot keys this
can lead to sometimes 100s of records to get grouped together. Even worse,
these records are session bound, we use session windows. Hence we see lots
of activity around the store backing the aggregate function and even though
we use SSD drives we’re not seeing the kind of performance we want to see.
It seems like the aggregate function leads to lots of updates to these hot
keys which lead to lots of rocksdb activity.

Now there are many ways to fix this problem:
- just don’t aggregate, create an algorithm which is not reliant on
grouping/aggregating records. Not what we can do with our tight schedule
right now.
- do grouping/aggregating but employ n instances and rely on uniform
distribution of these tasks. This is the easiest solution and what we
expected to work but didn’t work as you can tell from this thread. We threw
4 instances at it but only 2 got used.
- tune rocksdb? I tried this actually but it didn’t really help us much,
aside from the fact that tuning rocksdb is very tricky.
- use in-memory store instead? Unfortunately we have to use session windows
for this aggregate function and apparently there’s no in-memory session
store impl? I tried to create one but soon realized it’s too much work :) I
looked at default PartitionAssigner code too, but that ain’t trivial either.

So I’m a bit hopeless :(

Ara.

On Mar 27, 2017, at 1:35 PM, Matthias J. Sax > wrote:






This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.



From: "Matthias J. Sax" mailto:matth...@confluent.io
>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 1:35:30 PM PDT
To: users@kafka.apache.org
Reply-To: mailto:users@kafka.apache.org>>


Ara,

thanks for the detailed information.

If I parse this correctly, both instances run the same number of tasks
(12 each). That is all Streams promises.

To come back to your initial question:

Is there a way to tell kafka streams to uniformly assign partitions across
instances? If I have n kafka streams instances running, I want each to
handle EXACTLY 1/nth number of partitions. No dynamic task assignment
logic. Just dumb 1/n assignment.

That is exactly what you get: each of you two instances get 24/2 = 12
tasks assigned. That is dump 1/n assignment, isn't it? So my original
response was correct.

However, I now understand better what you are actually meaning by your
question. Note that Streams does not distinguish "type" of tasks -- it
only sees 24 tasks and assigns those in a balanced way.

Thus, currently there is no easy way to get the assignment you want to
have, except, you implement you own `PartitionAssignor`.

This is the current implementation for 0.10.2
https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

You can, if you wish write your own assignor and set it via
StreamsConfig. However, be aware that this might be tricky to get right
and also might have runtime implications with regard to rebalancing and
state store recovery. We recently improve the current implementation to
avoid costly task movements:
https://issues.apache.org/jira/browse/KAFKA-4677

Thus, I would not recommend to implement an own `PartitionAssignor`.


However, the root question is, why do you need this exact assignment you
are looking for in the first place? Why is it "bad" if "types" of tasks
are not distinguished? I would like to understand your requirement
better -- it might be worth to improve Streams here.


-Matthias


On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
Hi,

So, I simplified the topology by making sure we have only 1 sourc

Re: 3 machines 12 threads all fail within 2 hours of starting the streams application

2017-03-27 Thread Eno Thereska
Also just a heads up that a PR that increases resiliency is being currently 
reviewed and should hopefully hit trunk soon: 
https://github.com/apache/kafka/pull/2719 
. This covers certain broker failure 
scenarios as well as a (hopefully last) case when state locking fails. It also 
includes the retries config I had mentioned earlier.

Thanks
Eno


> On 25 Mar 2017, at 18:14, Sachin Mittal  wrote:
> 
> Hi,
> The broker is a three machine cluster. The replication factor for input and
> also internal topics is 3.
> Brokers don't seem to fail. I always see their instances running.
> 
> Also note that when an identical streams application with single thread on
> a single instance is pulling data from some other non partitioned identical
> topic, the application never fails. Note there too replication factor is 3
> for input and internal topics.
> 
> Please let us know if you have something for other errors. Also what ways
> we can make the steams resilient. I do feel we need hooks to start new
> stream threads just in case some thread shuts down due to unhandled
> exception, or streams application itself doing a better job in handling
> such and not shutting down the threads.
> 
> Thanks
> Sachin
> 
> 
> On Sat, Mar 25, 2017 at 11:03 PM, Eno Thereska 
> wrote:
> 
>> Hi Sachin,
>> 
>> See my previous email on the NotLeaderForPartitionException error.
>> 
>> What is your Kafka configuration, how many brokers are you using? Also
>> could you share the replication level (if different from 1) of your streams
>> topics? Are there brokers failing while Streams is running?
>> 
>> Thanks
>> Eno
>> 
>> On 25/03/2017, 11:00, "Sachin Mittal"  wrote:
>> 
>>Hi All,
>>I am revisiting the ongoing issue of getting a multi instance multi
>>threaded kafka streams cluster to work.
>> 
>>Scenario is that we have a 12 partition source topic. (note our server
>>cluster replication factor is 3).
>>We have a 3 machines client cluster with one instance on each. Each
>>instances uses 4 thread.
>>Streams version is 0.10.2 with latest deadlock fix and rocks db
>>optimization from trunk.
>> 
>>We also have an identical single partition topic and another single
>>threaded instance doing identical processing as the above one. This
>> uses
>>version 0.10.1.1
>>This streams application never goes down.
>> 
>>The above application used to go down frequently with high cpu wait
>> time
>>and also we used to get frequent deadlock issues. However since
>> including
>>the fixes we see very little cpu wait time and now application does not
>>enter into deadlock. The threads simply get uncaught exception thrown
>> from
>>the streams application and they die one by one eventually shutting
>> down
>>the entire client cluster.
>>So we now need to understand what could be causing these exceptions
>> and how
>>we can fix those.
>> 
>>Here is the summary
>>instance 84
>>All four thread die due to
>>org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>> server
>>is not the leader for that topic-partition.
>> 
>>So is this something we can handle at streams level and not get it
>> thrown
>>all the way to the thread.
>> 
>> 
>>instance 85
>>two again dies due to
>>org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>> server
>>is not the leader for that topic-partition.
>> 
>>other two die due to
>>Caused by: org.rocksdb.RocksDBException: ~
>>I know this is some known rocksdb issue. Is there a way we can handle
>> it at
>>stream side. What do you suggest to avoid this or what can be causing
>> it.
>> 
>> 
>>instance 87
>>two again die due to
>>org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>> server
>>is not the leader for that topic-partition.
>> 
>>one dies due to
>>org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
>> for
>>new-part-advice-key-table-changelog-11: 30015 ms has passed since last
>>append
>> 
>>I have really not understood what this means and any idea what could
>> be the
>>issue here?
>> 
>>last one dies due to
>>Caused by: java.lang.IllegalStateException: task [0_9] Log end offset
>> of
>>new-part-advice-key-table-changelog-9 should not change while
>> restoring:
>>old end offset 647352, current offset 647632
>> 
>>I feel this should not be thrown to the stream thread too and handled
>> at
>>streams level.
>> 
>>The complete logs can be found at:
>>https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_
>> 85_87_log.zip?dl=0
>> 
>>So I feel basically the streams application should be more resilient
>> and
>>should not fail due to exceptions but should have a way to handle them.
>>or provide programmers the hooks that even in case a stream thread is
>> shut
>>down there is a way to start a new thread s

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a critical part 
of our pipeline involves grouping relevant records together (that’s what the 
aggregate function in the topology is for). And for hot keys this can lead to 
sometimes 100s of records to get grouped together. Even worse, these records 
are session bound, we use session windows. Hence we see lots of activity around 
the store backing the aggregate function and even though we use SSD drives 
we’re not seeing the kind of performance we want to see. It seems like the 
aggregate function leads to lots of updates to these hot keys which lead to 
lots of rocksdb activity.

Now there are many ways to fix this problem:
- just don’t aggregate, create an algorithm which is not reliant on 
grouping/aggregating records. Not what we can do with our tight schedule right 
now.
- do grouping/aggregating but employ n instances and rely on uniform 
distribution of these tasks. This is the easiest solution and what we expected 
to work but didn’t work as you can tell from this thread. We threw 4 instances 
at it but only 2 got used.
- tune rocksdb? I tried this actually but it didn’t really help us much, aside 
from the fact that tuning rocksdb is very tricky.
- use in-memory store instead? Unfortunately we have to use session windows for 
this aggregate function and apparently there’s no in-memory session store impl? 
I tried to create one but soon realized it’s too much work :) I looked at 
default PartitionAssigner code too, but that ain’t trivial either.

So I’m a bit hopeless :(

Ara.

On Mar 27, 2017, at 1:35 PM, Matthias J. Sax 
mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 1:35:30 PM PDT
To: users@kafka.apache.org
Reply-To: mailto:users@kafka.apache.org>>


Ara,

thanks for the detailed information.

If I parse this correctly, both instances run the same number of tasks
(12 each). That is all Streams promises.

To come back to your initial question:

Is there a way to tell kafka streams to uniformly assign partitions across 
instances? If I have n kafka streams instances running, I want each to handle 
EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just dumb 
1/n assignment.

That is exactly what you get: each of you two instances get 24/2 = 12
tasks assigned. That is dump 1/n assignment, isn't it? So my original
response was correct.

However, I now understand better what you are actually meaning by your
question. Note that Streams does not distinguish "type" of tasks -- it
only sees 24 tasks and assigns those in a balanced way.

Thus, currently there is no easy way to get the assignment you want to
have, except, you implement you own `PartitionAssignor`.

This is the current implementation for 0.10.2
https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

You can, if you wish write your own assignor and set it via
StreamsConfig. However, be aware that this might be tricky to get right
and also might have runtime implications with regard to rebalancing and
state store recovery. We recently improve the current implementation to
avoid costly task movements:
https://issues.apache.org/jira/browse/KAFKA-4677

Thus, I would not recommend to implement an own `PartitionAssignor`.


However, the root question is, why do you need this exact assignment you
are looking for in the first place? Why is it "bad" if "types" of tasks
are not distinguished? I would like to understand your requirement
better -- it might be worth to improve Streams here.


-Matthias


On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
Hi,

So, I simplified the topology by making sure we have only 1 source topic. Now I 
have 1 source topic, 8 partitions, 2 instances. And here’s how the topology 
looks like:

instance 1:

KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-1
Active tasks:
StreamsTask taskId: 0_3
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children: [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children: [KSTREAM-BRANCH-03]
KSTREAM-BRANCH-03:
children: [KSTREAM-BRANCHCHILD-04, KSTREAM-BRANCHCHILD-05]
KSTREAM-BRANCHCHILD-04

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Matthias J. Sax
Ara,

thanks for the detailed information.

If I parse this correctly, both instances run the same number of tasks
(12 each). That is all Streams promises.

To come back to your initial question:

> Is there a way to tell kafka streams to uniformly assign partitions across 
> instances? If I have n kafka streams instances running, I want each to handle 
> EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just 
> dumb 1/n assignment.

That is exactly what you get: each of you two instances get 24/2 = 12
tasks assigned. That is dump 1/n assignment, isn't it? So my original
response was correct.

However, I now understand better what you are actually meaning by your
question. Note that Streams does not distinguish "type" of tasks -- it
only sees 24 tasks and assigns those in a balanced way.

Thus, currently there is no easy way to get the assignment you want to
have, except, you implement you own `PartitionAssignor`.

This is the current implementation for 0.10.2
https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

You can, if you wish write your own assignor and set it via
StreamsConfig. However, be aware that this might be tricky to get right
and also might have runtime implications with regard to rebalancing and
state store recovery. We recently improve the current implementation to
avoid costly task movements:
https://issues.apache.org/jira/browse/KAFKA-4677

Thus, I would not recommend to implement an own `PartitionAssignor`.


However, the root question is, why do you need this exact assignment you
are looking for in the first place? Why is it "bad" if "types" of tasks
are not distinguished? I would like to understand your requirement
better -- it might be worth to improve Streams here.


-Matthias


On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
> Hi,
> 
> So, I simplified the topology by making sure we have only 1 source topic. Now 
> I have 1 source topic, 8 partitions, 2 instances. And here’s how the topology 
> looks like:
> 
> instance 1:
> 
> KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-1
> Active tasks:
> StreamsTask taskId: 0_3
> ProcessorTopology:
> KSTREAM-SOURCE-00:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-01]
> KSTREAM-FILTER-01:
> children: [KSTREAM-MAP-02]
> KSTREAM-MAP-02:
> children: [KSTREAM-BRANCH-03]
> KSTREAM-BRANCH-03:
> children: [KSTREAM-BRANCHCHILD-04, KSTREAM-BRANCHCHILD-05]
> KSTREAM-BRANCHCHILD-04:
> children: [KSTREAM-MAPVALUES-06]
> KSTREAM-MAPVALUES-06:
> children: [KSTREAM-FLATMAPVALUES-07]
> KSTREAM-FLATMAPVALUES-07:
> children: [KSTREAM-MAP-08]
> KSTREAM-MAP-08:
> children: [KSTREAM-FILTER-11]
> KSTREAM-FILTER-11:
> children: [KSTREAM-SINK-10]
> KSTREAM-SINK-10:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-05:
> Partitions [activities-avro-or-3]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-2
> Active tasks:
> StreamsTask taskId: 1_2
> ProcessorTopology:
> KSTREAM-SOURCE-12:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-09]
> KSTREAM-AGGREGATE-09:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-13]
> KTABLE-TOSTREAM-13:
> children: [KSTREAM-FILTER-14]
> KSTREAM-FILTER-14:
> children: [KSTREAM-FILTER-15]
> KSTREAM-FILTER-15:
> children: [KSTREAM-MAP-16]
> KSTREAM-MAP-16:
> children: [KSTREAM-MAP-17]
> KSTREAM-MAP-17:
> children: [KSTREAM-SINK-18]
> KSTREAM-SINK-18:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-2]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-3
> Active tasks:
> StreamsTask taskId: 1_1
> ProcessorTopology:
> KSTREAM-SOURCE-12:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-09]
> KSTREAM-AGGREGATE-09:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-13]
> KTABLE-TOSTREAM-13:
> children: [KSTREAM-FILTER-14]
> KSTREAM-FILTER-14:
> children: [KSTREAM-FILTER-15]
> KSTREAM-FILTER-15:
> children: [KSTREAM-MAP-16]
> KSTREAM-MAP-16:
> children: [KSTREAM-MAP-17]
> KSTREAM-MAP-17:
> children: [KSTREAM-SINK-18]
> KSTREAM-SINK-18:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-1]
> StreamsTask taskId: 2_7
> ProcessorTopolo

Re: 3 machines 12 threads all fail within 2 hours of starting the streams application

2017-03-27 Thread Matthias J. Sax
Sachin,

about this statement:

>> Also note that when an identical streams application with single thread on
>> a single instance is pulling data from some other non partitioned identical
>> topic, the application never fails. 

What about some "combinations":
 - single threaded multiple instances
 - single thread single instance but multiple partitions
etc.

it would be helpful to understand, what scenario works and what not.
Right now you go from
single-threaded-sinlge-instance-with-no-partitioning to
multi-threaded-multiple-instances-and-partitioned -- that's a big step
to reason about the situation.


-Matthias


On 3/25/17 11:14 AM, Sachin Mittal wrote:
> Hi,
> The broker is a three machine cluster. The replication factor for input and
> also internal topics is 3.
> Brokers don't seem to fail. I always see their instances running.
> 
> Also note that when an identical streams application with single thread on
> a single instance is pulling data from some other non partitioned identical
> topic, the application never fails. Note there too replication factor is 3
> for input and internal topics.
> 
> Please let us know if you have something for other errors. Also what ways
> we can make the steams resilient. I do feel we need hooks to start new
> stream threads just in case some thread shuts down due to unhandled
> exception, or streams application itself doing a better job in handling
> such and not shutting down the threads.
> 
> Thanks
> Sachin
> 
> 
> On Sat, Mar 25, 2017 at 11:03 PM, Eno Thereska 
> wrote:
> 
>> Hi Sachin,
>>
>> See my previous email on the NotLeaderForPartitionException error.
>>
>> What is your Kafka configuration, how many brokers are you using? Also
>> could you share the replication level (if different from 1) of your streams
>> topics? Are there brokers failing while Streams is running?
>>
>> Thanks
>> Eno
>>
>> On 25/03/2017, 11:00, "Sachin Mittal"  wrote:
>>
>> Hi All,
>> I am revisiting the ongoing issue of getting a multi instance multi
>> threaded kafka streams cluster to work.
>>
>> Scenario is that we have a 12 partition source topic. (note our server
>> cluster replication factor is 3).
>> We have a 3 machines client cluster with one instance on each. Each
>> instances uses 4 thread.
>> Streams version is 0.10.2 with latest deadlock fix and rocks db
>> optimization from trunk.
>>
>> We also have an identical single partition topic and another single
>> threaded instance doing identical processing as the above one. This
>> uses
>> version 0.10.1.1
>> This streams application never goes down.
>>
>> The above application used to go down frequently with high cpu wait
>> time
>> and also we used to get frequent deadlock issues. However since
>> including
>> the fixes we see very little cpu wait time and now application does not
>> enter into deadlock. The threads simply get uncaught exception thrown
>> from
>> the streams application and they die one by one eventually shutting
>> down
>> the entire client cluster.
>> So we now need to understand what could be causing these exceptions
>> and how
>> we can fix those.
>>
>> Here is the summary
>> instance 84
>> All four thread die due to
>> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>> server
>> is not the leader for that topic-partition.
>>
>> So is this something we can handle at streams level and not get it
>> thrown
>> all the way to the thread.
>>
>>
>> instance 85
>> two again dies due to
>> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>> server
>> is not the leader for that topic-partition.
>>
>> other two die due to
>> Caused by: org.rocksdb.RocksDBException: ~
>> I know this is some known rocksdb issue. Is there a way we can handle
>> it at
>> stream side. What do you suggest to avoid this or what can be causing
>> it.
>>
>>
>> instance 87
>> two again die due to
>> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>> server
>> is not the leader for that topic-partition.
>>
>> one dies due to
>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
>> for
>> new-part-advice-key-table-changelog-11: 30015 ms has passed since last
>> append
>>
>> I have really not understood what this means and any idea what could
>> be the
>> issue here?
>>
>> last one dies due to
>> Caused by: java.lang.IllegalStateException: task [0_9] Log end offset
>> of
>> new-part-advice-key-table-changelog-9 should not change while
>> restoring:
>> old end offset 647352, current offset 647632
>>
>> I feel this should not be thrown to the stream thread too and handled
>> at
>> streams level.
>>
>> The complete logs can be found at:
>> https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_
>> 85_87_log.zip?dl=0
>>
>> So I feel basically the streams a

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Hi,

So, I simplified the topology by making sure we have only 1 source topic. Now I 
have 1 source topic, 8 partitions, 2 instances. And here’s how the topology 
looks like:

instance 1:

KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-1
Active tasks:
StreamsTask taskId: 0_3
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children: [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children: [KSTREAM-BRANCH-03]
KSTREAM-BRANCH-03:
children: [KSTREAM-BRANCHCHILD-04, KSTREAM-BRANCHCHILD-05]
KSTREAM-BRANCHCHILD-04:
children: [KSTREAM-MAPVALUES-06]
KSTREAM-MAPVALUES-06:
children: [KSTREAM-FLATMAPVALUES-07]
KSTREAM-FLATMAPVALUES-07:
children: [KSTREAM-MAP-08]
KSTREAM-MAP-08:
children: [KSTREAM-FILTER-11]
KSTREAM-FILTER-11:
children: [KSTREAM-SINK-10]
KSTREAM-SINK-10:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-05:
Partitions [activities-avro-or-3]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-2
Active tasks:
StreamsTask taskId: 1_2
ProcessorTopology:
KSTREAM-SOURCE-12:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-09]
KSTREAM-AGGREGATE-09:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-13]
KTABLE-TOSTREAM-13:
children: [KSTREAM-FILTER-14]
KSTREAM-FILTER-14:
children: [KSTREAM-FILTER-15]
KSTREAM-FILTER-15:
children: [KSTREAM-MAP-16]
KSTREAM-MAP-16:
children: [KSTREAM-MAP-17]
KSTREAM-MAP-17:
children: [KSTREAM-SINK-18]
KSTREAM-SINK-18:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-2]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-3
Active tasks:
StreamsTask taskId: 1_1
ProcessorTopology:
KSTREAM-SOURCE-12:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-09]
KSTREAM-AGGREGATE-09:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-13]
KTABLE-TOSTREAM-13:
children: [KSTREAM-FILTER-14]
KSTREAM-FILTER-14:
children: [KSTREAM-FILTER-15]
KSTREAM-FILTER-15:
children: [KSTREAM-MAP-16]
KSTREAM-MAP-16:
children: [KSTREAM-MAP-17]
KSTREAM-MAP-17:
children: [KSTREAM-SINK-18]
KSTREAM-SINK-18:
topic: ml-features-avro-or
Partitions [activities-by-phone-store-or-repartition-1]
StreamsTask taskId: 2_7
ProcessorTopology:
KSTREAM-SOURCE-19:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-7]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-4
Active tasks:
StreamsTask taskId: 2_0
ProcessorTopology:
KSTREAM-SOURCE-19:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-0]
StreamsTask taskId: 2_6
ProcessorTopology:
KSTREAM-SOURCE-19:
topics: [ml-features-avro-or]
Partitions [ml-features-avro-or-6]
Standby tasks:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-5
Active tasks:
StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children: [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children: [KSTREAM-BRANCH-03]
KSTREAM-BRANCH-03:
children: [KSTREAM-BRANCHCHILD-04, KSTREAM-BRANCHCHILD-05]
KSTREAM-BRANCHCHILD-04:
children: [KSTREAM-MAPVALUES-06]
KSTREAM-MAPVALUES-06:
children: [KSTREAM-FLATMAPVALUES-07]
KSTREAM-FLATMAPVALUES-07:
children: [KSTREAM-MAP-08]
KSTREAM-MAP-08:
children: [KSTREAM-FILTER-11]
KSTREAM-FILTER-11:
children: [KSTREAM-SINK-10]
KSTREAM-SINK-10:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-05:
Partitions [activities-avro-or-0]
StreamsTask taskId: 1_6
ProcessorTopology:
KSTREAM-SOURCE-12:
topics: [activities-by-phone-store-or-repartition]
children: [KSTREAM-AGGREGATE-09]
KSTREAM-AGGREGATE-09:
states: [activities-by-phone-store-or]
children: [KTABLE-TOSTREAM-13]
KTABLE-TOSTREAM-13:
children: [KSTREAM-FILTER-14]
KSTREAM-FILTER-14:
children: [KSTREAM-FILTER-15]
KSTREAM-FILTER-15:
children: [KSTREAM-MAP-16]
KSTREAM-MAP-16:
children: [KSTREAM-MAP-17]
KSTREAM-MAP-17:
children: [KSTREAM-SINK-00

Re: APPLICATION_SERVER_CONFIG ?

2017-03-27 Thread Michael Noll
Yes, agreed -- the re-thinking pre-existing notions is a big part of such
conversations.  A bit like making the mental switch from object-oriented
programming to functional programming -- and, just like in this case,
neither is more "right" than the other.  Personal
opinion/preference/context matters a lot, hence I tried to carefully phrase
my answer in a way that it doesn't come across as potentially
indoctrinating. ;-)

On Fri, Mar 24, 2017 at 6:34 PM, Jon Yeargers 
wrote:

> You make some great cases for your architecture. To be clear - Ive been
> proselytizing for kafka since I joined this company last year. I think my
> largest issue is rethinking some preexisting notions about streaming to
> make them work in the kstream universe.
>
> On Fri, Mar 24, 2017 at 6:07 AM, Michael Noll 
> wrote:
>
> > > If I understand this correctly: assuming I have a simple aggregator
> > > distributed across n-docker instances each instance will _also_ need to
> > > support some sort of communications process for allowing access to its
> > > statestore (last param from KStream.groupby.aggregate).
> >
> > Yes.
> >
> > See
> > http://docs.confluent.io/current/streams/developer-
> > guide.html#your-application-and-interactive-queries
> > .
> >
> > > - The tombstoning facilities of redis or C* would lend themselves well
> to
> > > implementing a 'true' rolling aggregation
> >
> > What is a 'true' rolling aggregation, and how could Redis or C* help with
> > that in a way that Kafka can't?  (Honest question.)
> >
> >
> > > I get that RocksDB has a small footprint but given the choice of
> > > implementing my own RPC / gossip-like process for data sharing and
> using
> > a
> > > well tested one (ala C* or redis) I would almost always opt for the
> > latter.
> > > [...]
> > > Just my $0.02. I would love to hear why Im missing the 'big picture'.
> The
> > > kstreams architecture seems rife with potential.
> >
> > One question is, for example:  can the remote/central DB of your choice
> > (Redis, C*) handle as many qps as Kafka/Kafka Streams/RocksDB can handle?
> > Over the network?  At the same low latency?  Also, what happens if the
> > remote DB is unavailable?  Do you wait and retry?  Discard?  Accept the
> > fact that your app's processing latency will now go through the roof?  I
> > wrote about some such scenarios at
> > https://www.confluent.io/blog/distributed-real-time-joins-
> > and-aggregations-on-user-activity-events-using-kafka-streams/
> > .
> >
> > One big advantage (for many use cases, not for) with Kafka/Kafka Streams
> is
> > that you can leverage fault-tolerant *local* state that may also be
> > distributed across app instances.  Local state is much more efficient and
> > faster when doing stateful processing such as joins or aggregations.  You
> > don't need to worry about an external system, whether it's up and
> running,
> > whether its version is still compatible with your app, whether it can
> scale
> > as much as your app/Kafka Streams/Kafka/the volume of your input data.
> >
> > Also, note that some users have actually opted to run hybrid setups:
> Some
> > processing output is sent to a remote data store like Cassandra (e.g. via
> > Kafka Connect), some processing output is exposed directly through
> > interactive queries.  It's not like your forced to pick only one
> approach.
> >
> >
> > > - Typical microservices would separate storing / retrieving data
> >
> > I'd rather argue that for microservices you'd oftentimes prefer to *not*
> > use a remote DB, and rather do everything inside your microservice
> whatever
> > the microservice needs to do (perhaps we could relax this to "do
> everything
> > in a way that your microservices is in full, exclusive control", i.e. it
> > doesn't necessarily need to be *inside*, but arguably it would be better
> if
> > it actually is).
> > See e.g. the article
> > https://www.confluent.io/blog/data-dichotomy-rethinking-the-
> > way-we-treat-data-and-services/
> > that lists some of the reasoning behind this school of thinking.  Again,
> > YMMV.
> >
> > Personally, I think there's no simple true/false here.  The decisions
> > depend on what you need, what your context is, etc.  Anyways, since you
> > already have some opinions for the one side, I wanted to share some food
> > for thought for the other side of the argument. :-)
> >
> > Best,
> > Michael
> >
> >
> >
> >
> >
> > On Fri, Mar 24, 2017 at 1:25 PM, Jon Yeargers 
> > wrote:
> >
> > > If I understand this correctly: assuming I have a simple aggregator
> > > distributed across n-docker instances each instance will _also_ need to
> > > support some sort of communications process for allowing access to its
> > > statestore (last param from KStream.groupby.aggregate).
> > >
> > > How would one go about substituting a separated db (EG redis) for the
> > > statestore?
> > >
> > > Some advantages to decoupling:
> > > - It would seem like having a centralized process like this would
> > alleviate
> > > the need to exe

Re: kafka streams in-memory Keyvalue store iterator remove broken on upgrade to 0.10.2.0 from 0.10.1.1

2017-03-27 Thread Matthias J. Sax
Yes, that is what we do in 0.10.2 -- it was a bug in 0.10.1 to not throw
an exception :)

-Matthias


On 3/27/17 5:07 AM, Thomas Becker wrote:
> Couldn't this have been solved by returning a ReadOnlyKeyValueIterator
> that throws an exception from remove() from the
> ReadOnlyKeyValueStore.iterator()? That preserves the ability to call
> remove() when it's appropriate and moves the refused bequest to when
> you shouldn't.
> 
> On Thu, 2017-03-23 at 11:05 -0700, Matthias J. Sax wrote:
>> There is a difference between .delete() and it.remove().
>>
>> .delete() can only be called in a Streams operator that is
>> responsible
>> to maintain the state. This is of course required to give the
>> developer
>> writing the operator has full control over the store.
>>
>> However, it.remove() is called *outside* from the Streams part of
>> your
>> app. Thus, if a second developer queries a store, she should not be
>> able
>> to "mess" with the store -- she does not own the store.
>>
>> Does this make sense?
>>
>>
>> -Matthias
>>
>>
>> On 3/22/17 3:27 PM, Tom Dearman wrote:
>>>
>>> Hi,
>>>
>>> What I was trying to accomplish was the normal usage of the
>>> iterator
>>> interface to enable safe remove while iterating over a collection.
>>> I
>>> have used iterator.remove since kafka streams was released, so this
>>> has been the real functionality since release and in the absence of
>>> documentation to say otherwise feels like a bug has been introduced
>>> now.  If KeyValueStore#delete doesn't mess up the internal state
>>> during the single threaded access to the store I'm not sure why
>>> iterator.remove would.j
>>> Having said that, I will save the keys for removal during iteration
>>> and delete after.
>>>
>>> Thanks for you help.
>>>
>>> Tom
>>>
>>> On 22 March 2017 at 19:34, Michael Noll 
>>> wrote:

 To add to what Matthias said, in case the following isn't clear:

 - You should not (and, in 0.10.2, cannot any longer) call the
 iterator's
 remove() method, i.e. `KeyValueIterator#remove()` when iterating
 through a
 `KeyValueStore`.  Perhaps this is something we should add to the
 `KeyValueIterator` javadocs.

 - You can of course call the store's delete() method:
 `KeyValueStore#delete(K key)`.

 Just mentioning this because, when reading the thread quickly, I
 missed the
 "iterator" part and thought removal/deletion on the store wasn't
 working.
 ;-)

 Best,
 Michael




 On Wed, Mar 22, 2017 at 8:18 PM, Matthias J. Sax >>> ent.io>
 wrote:

>
> Hi,
>
> remove() should not be supported -- thus, it's actually a bug
> in 0.10.1
> that got fixed in 0.10.2.
>
> Stores should only be altered by Streams and iterator over the
> stores
> should be read-only -- otherwise, you might mess up Streams
> internal state.
>
> I would highly recommend to reconsider the call to it.remove()
> in you
> application. Not sure what you try to accomplish, but you
> should do it
> differently.
>
>
> -Matthias
>
>
> On 3/22/17 8:00 AM, Tom Dearman wrote:
>>
>> Hi, hope someone on kafka-streams team can help.  Our
>> application uses
>>
>> KeyValueIterator it = KeyValueStore.all();
>>
>> …..
>> it.remove()
>>
>>
>> This used to work but is now broken, causes our punctuate to
>> fail and
> StreamThread to die.  The cause seems to be that there were
> changes in
> 0.10.2.0 to InMemoryKeyValueStoreSupplier:
>>
>>
>>
>>
>> public synchronized KeyValueIterator all() {
>> final TreeMap copy = new TreeMap<>(this.map);
>> return new
>> MemoryStoreIterator<>(copy.entrySet().iterator());
>> }
>>
>> @Override
>> public synchronized KeyValueIterator all() {
>> final TreeMap copy = new TreeMap<>(this.map);
>> return new DelegatingPeekingKeyValueIterator<>(name, new
> MemoryStoreIterator<>(copy.entrySet().iterator()));
>>
>> }
>> But the DelegatingPeekingKeyValueIterator has:
>>
>> @Override
>> public void remove() {
>> throw new UnsupportedOperationException("remove not
>> supported");
>> }
>> whereas the old direct call on MemoryStoreIterator allowed
>> remove.  For
> some reason there is no call to underlying.remove() in the
> DelegatingPeekingKeyValueIterator.
>>
>>
>> We don’t want to downgrade to 0.10.1.1 as there was a useful
>> bug fix and
> removing dependancy on zookeeper.
>>
>>
>> Thanks,
>> Tom
>>
>
> --
> 
> 
> Tommy Becker
> 
> Senior Software Engineer
> 
> O +1 919.460.4747
> 
> tivo.com
> 
> 
> 
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, copying, or 
> distribu

Kafka heap space issue

2017-03-27 Thread Karthik Jayaraman
I have posted my issue in -
http://stackoverflow.com/questions/43050796/kafka-server-kafkaserverstartable-java-lang-outofmemoryerror-java-heap-space
. Any help appreciated.

Thanks,
JK


Re: [Kafka Streams] - problem joining 2 different streams

2017-03-27 Thread Damian Guy
Hi Marco,

It looks like you are creating 2 independent instances of KafkaStreams and
trying to join across those instances. This wouldn't work and i'm surprised
it has let you get that far without some other exception.

You should remove this bit:
>KafkaStreams userLocationKafkaStream = new
>KafkaStreams(locationStreamBuilder, propsLocation);
>userLocationKafkaStream.start();
>
>//This Stream: User Activity
>KStreamBuilder activityStreamBuilder = new KStreamBuilder();

and build the input streams you want to join from the same builder, i.e.,
the original builder that you created. You then just start one instance of
KafkaStreams.

HTH,
Damian




On Mon, 27 Mar 2017 at 14:55 Marco Abitabile 
wrote:

> Hi all,
>
> I'm struggling with an apparently simple problem.
> I'm joining 2 different streams:
>
> Stream1. User activity data,  with key, value --> 
> Stream2. User location data (such as the city name) with key, value -->
> 
>
> Keys are homogeneous in content and represents the id of the user's device.
>
> The error thrown is:
> Exception in thread "StreamThread-2" java.lang.ClassCastException:
> com.mytest.JsonObject cannot be cast to  java.lang.String
> at com.mytest.serdes.JsonObjectSerde$1.serialize(JsonObjectSerde.java:49)
> at
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>
> This is the code I'm running:
>
> //Other Stream: User Location, is a string with the name of the city the
> user is (like "San Francisco")
> KStreamBuilder locationStreamBuilder = new KStreamBuilder();
> KStream userLocationStream =
> locationStreamBuilder.stream(stringSerde, stringSerde,
> "userLocationStreamData");
> KStream locationKstream = userLocationStream.
> map(MyStreamUtils::enhanceWithAreaDetails);
> locationKstream.to("user_location");
> KafkaStreams userLocationKafkaStream = new
> KafkaStreams(locationStreamBuilder, propsLocation);
> userLocationKafkaStream.start();
>
> //This Stream: User Activity
> KStreamBuilder activityStreamBuilder = new KStreamBuilder();
> KStream activity =
> activityStreamBuilder.stream(stringSerde,
> jsonSerde, "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
> .map(MyStreamUtils::enhanceWithScoreDetails)
> .join(
> locationKstream,
> MyStreamUtils::locationActivityJoiner,
> JoinWindows.of(1000).until(1000 * 60 * 5),
> stringSerde, jsonSerde, stringSerde)
> .to("usersWithLocation")
>
> KafkaStreams userActivityStream = new KafkaStreams(builder, propsActivity);
> userActivityStream.start();
>
> And MyStreamUtils::locationActivityJoiner does:
>
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
> JsonObject join = activity.copy();
> join.put("city" , loc);
> return join;
> }
>
>
> Basically it seems that  locationActivityJoiner  receives either as right
> and left, elements that belongs only from activity  KStream, while I was
> expecting to receive an activity (a JsonObject object) and a userLocation
> (a
> String object) element.
>
> how is this possible? I can't get where I'm doing wrong.
> Do you have any clue on why this is happenings?
>
> thanks a lot for your support and work.
>
> Best
> Marco
>


Re: using a state store for deduplication

2017-03-27 Thread Michael Noll
Jon,

Damian already answered your direct question, so my comment is a FYI:

There's a demo example at
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
(this is for Confluent 3.2 / Kafka 0.10.2.0).

Note that this code is for demonstration purposes.  To make the example
more suitable to production use cases you could e.g. switch to a window
store instead of manually purging expired entries via
`ReadOnlyKeyValueStore#all()` (which might be an expensive
operation/iteration).

Hope this helps,
Michael




On Mon, Mar 27, 2017 at 3:07 PM, Damian Guy  wrote:

> Jon,
> You don't need all the data for every topic as the data is partitioned by
> key. Therefore each state-store instance is de-duplicating a subset of the
> key set.
> Thanks,
> Damian
>
> On Mon, 27 Mar 2017 at 13:47 Jon Yeargers 
> wrote:
>
> > Ive been (re)reading this document(
> > http://docs.confluent.io/3.2.0/streams/developer-guide.html#state-stores
> )
> > hoping to better understand StateStores. At the top of the section there
> is
> > a tantalizing note implying that one could do deduplication using a
> store.
> >
> > At present we using Redis for this as it gives us a shared location. Ive
> > been of the mind that a given store was local to a streams instance. To
> > truly support deduplication I would think one would need access to _all_
> > the data for a topic and not just on a per-partition basis.
> >
> > Am I completely misunderstanding this?
> >
>


Re: kafka streams 0.10.2 failing while restoring offsets

2017-03-27 Thread Sachin Mittal
Well lets say previous thread was processing a large block and while
processing it got bumped out.
So new thread which got this partition may have different offset when it
starts the restore and by the time it completes the restore.

If that is the case then it should just ignore that task for that cycle and
move on to next task.
When it returns again it can again try to restore the state and if by that
time there is no other thread processing the partition it can start
processing.

I see no reason to raise the exception and kill the thread entirely.

Thanks
Sachin


On Mon, Mar 27, 2017 at 3:56 PM, Damian Guy  wrote:

> Yes, but we don't know why it is still processing the data. We don't want
> to have multiple processes acting on the same tasks, hence the exception.
> What if for some reason the other task is processing a large backlog, how
> long do we wait before we give up?
>
> I think in this case the exception is the right thing to do
>
> On Mon, 27 Mar 2017 at 09:24 Sachin Mittal  wrote:
>
> Hi,
> These are the logs
> https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0
>
> I think this may not always be the case especially if previous owner is on
> a different machine.
>
> Say it is processing and it takes more than the poll timeout to process or
> commit the offset.
>
> The group bumps this thread and assigns its task to a different thread on
> maybe a different machine.
>
> All this while this client may be pushing the changelog data and other
> thread restoring the state from the same partition.
>
> So between the time it starts and it seeks to the end of the changelog it
> may be possible that previous thread which was still in process since it
> did not know it got bumped out added some more data to that changelog.
>
> Only when previous thread tries to commit the offset it gets to know that
> it is no longer the owner of the partition and then issues a rejoin
> request.
>
> I think in such a case should be handled within streams application.
>
> Thanks
> Sachin
>
>
>
>
>
> On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy  wrote:
>
> > Hi Sachin,
> >
> > This should not happen. The previous owner of the task should have
> stopped
> > processing before the restoration begins. So if this is happening, then
> > that signals a bug. Do you have more logs?
> >
> > Thanks,
> > Damian
> >
> > On Sat, 25 Mar 2017 at 08:20 Sachin Mittal  wrote:
> >
> > > Hi,
> > > So recently we fixed the deadlock issue and also built the streams jar
> by
> > > copying the rocks db configs from trunk.
> > > So we don't get any deadlock issue now and also we see that the wait
> time
> > > of CPU cores stays around 5% (down from 50% earlier).
> > >
> > > However we now get a new exception which is not handled by streams
> > > application and causes the instance to shutdown.
> > >
> > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > [StreamThread-2] Failed to rebalance
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:622)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:378)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset
> of
> > > new-part-advice-key-table-changelog-9 should not change while
> restoring:
> > > old end offset 647352, current offset 647632
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > restoreActiveState(ProcessorStateManager.java:240)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > register(ProcessorStateManager.java:193)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> > register(AbstractProcessorContext.java:99)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> > init(RocksDBSegmentedBytesStore.java:101)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.
> ChangeLoggingSegmentedBytesSto
> > re.init(ChangeLoggingSegmentedBytesStore.java:68)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> > init(MeteredSegmentedBytesStore.java:66)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > RocksDBWindowStore.java:76)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > initializeStateStores(AbstractTask.java:86)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > a

[Kafka Streams] - problem joining 2 different streams

2017-03-27 Thread Marco Abitabile
Hi all,

I'm struggling with an apparently simple problem.
I'm joining 2 different streams:

Stream1. User activity data,  with key, value --> 
Stream2. User location data (such as the city name) with key, value -->


Keys are homogeneous in content and represents the id of the user's device.

The error thrown is:
Exception in thread "StreamThread-2" java.lang.ClassCastException:
com.mytest.JsonObject cannot be cast to  java.lang.String
at com.mytest.serdes.JsonObjectSerde$1.serialize(JsonObjectSerde.java:49)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
at
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

This is the code I'm running:

//Other Stream: User Location, is a string with the name of the city the
user is (like "San Francisco")
KStreamBuilder locationStreamBuilder = new KStreamBuilder();
KStream userLocationStream =
locationStreamBuilder.stream(stringSerde, stringSerde,
"userLocationStreamData");
KStream locationKstream = userLocationStream.
map(MyStreamUtils::enhanceWithAreaDetails);
locationKstream.to("user_location");
KafkaStreams userLocationKafkaStream = new
KafkaStreams(locationStreamBuilder, propsLocation);
userLocationKafkaStream.start();

//This Stream: User Activity
KStreamBuilder activityStreamBuilder = new KStreamBuilder();
KStream activity =
activityStreamBuilder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
.map(MyStreamUtils::enhanceWithScoreDetails)
.join(
locationKstream,
MyStreamUtils::locationActivityJoiner,
JoinWindows.of(1000).until(1000 * 60 * 5),
stringSerde, jsonSerde, stringSerde)
.to("usersWithLocation")

KafkaStreams userActivityStream = new KafkaStreams(builder, propsActivity);
userActivityStream.start();

And MyStreamUtils::locationActivityJoiner does:

public static JsonObject locationActivityJoiner(JsonObject activity, String
loc) {
JsonObject join = activity.copy();
join.put("city" , loc);
return join;
}


Basically it seems that  locationActivityJoiner  receives either as right
and left, elements that belongs only from activity  KStream, while I was
expecting to receive an activity (a JsonObject object) and a userLocation (a
String object) element.

how is this possible? I can't get where I'm doing wrong.
Do you have any clue on why this is happenings?

thanks a lot for your support and work.

Best
Marco


kafka connector for mongodb as a source

2017-03-27 Thread VIVEK KUMAR MISHRA 13BIT0066
Hi All,

I am creating kafka connector for mongodb as a source .My connector is
starting and connecting with kafka but it is not committing any offset.

This is output after starting connector.

[root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh
config/connect-standalone.properties config/mongodb.properties
[2017-03-27 18:32:58,019] INFO StandaloneConfig values:
rest.advertised.host.name = null
task.shutdown.graceful.timeout.ms = 5000
rest.host.name = null
rest.advertised.port = null
bootstrap.servers = [localhost:9092]
offset.flush.timeout.ms = 5000
offset.flush.interval.ms = 1
rest.port = 8083
internal.key.converter = class
org.apache.kafka.connect.json.JsonConverter
access.control.allow.methods =
access.control.allow.origin =
offset.storage.file.filename = /tmp/connect.offsets
internal.value.converter = class
org.apache.kafka.connect.json.JsonConverter
value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
[2017-03-27 18:32:58,162] INFO Logging initialized @609ms
(org.eclipse.jetty.util.log:186)
[2017-03-27 18:32:58,392] INFO Kafka Connect starting
(org.apache.kafka.connect.runtime.Connect:52)
[2017-03-27 18:32:58,392] INFO Herder starting
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2017-03-27 18:32:58,393] INFO Worker starting
(org.apache.kafka.connect.runtime.Worker:113)
[2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file
/tmp/connect.offsets
(org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-03-27 18:32:58,398] INFO Worker started
(org.apache.kafka.connect.runtime.Worker:118)
[2017-03-27 18:32:58,398] INFO Herder started
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-03-27 18:32:58,398] INFO Starting REST server
(org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210
(org.eclipse.jetty.server.Server:327)
[2017-03-27 18:32:59,621] INFO HV01: Hibernate Validator 5.1.2.Final
(org.hibernate.validator.internal.util.Version:27)
Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The
(sub)resource method listConnectors in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains
empty path annotation.
WARNING: The (sub)resource method createConnector in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains
empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
contains empty path annotation.
WARNING: The (sub)resource method serverInfo in
org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty
path annotation.

[2017-03-27 18:33:00,015] INFO Started
o.e.j.s.ServletContextHandler@44e3760b{/,null,AVAILABLE}
(org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-03-27 18:33:00,042] INFO Started ServerConnector@7f58ad44{HTTP/1.1}{
0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-03-27 18:33:00,043] INFO Started @2492ms
(org.eclipse.jetty.server.Server:379)
[2017-03-27 18:33:00,043] INFO REST server listening at
http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-03-27 18:33:00,043] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:58)
[2017-03-27 18:33:00,048] INFO ConnectorConfig values:
connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max = 1
name = mongodb
value.converter = null
key.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:178)
[2017-03-27 18:33:00,048] INFO Creating connector mongodb of type
org.apache.kafka.connect.mongodb.MongodbSourceConnector
(org.apache.kafka.connect.runtime.Worker:159)
[2017-03-27 18:33:00,051] INFO Instantiated connector mongodb with version
0.10.0.1 of type class
org.apache.kafka.connect.mongodb.MongodbSourceConnector
(org.apache.kafka.connect.runtime.Worker:162)
[2017-03-27 18:33:00,053] INFO Finished creating connector mongodb
(org.apache.kafka.connect.runtime.Worker:173)
[2017-03-27 18:33:00,053] INFO SourceConnectorConfig values:
connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max = 1
name = mongodb
value.converter = null
key.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:178)
[2017-03-27 18:33:00,056] INFO Creating task mongodb-0
(org.apache.kafka.connect.runtime.Worker:252)
[2017-03-27 18:33:00,056] INFO ConnectorConfig values:
connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
task

Re: using a state store for deduplication

2017-03-27 Thread Damian Guy
Jon,
You don't need all the data for every topic as the data is partitioned by
key. Therefore each state-store instance is de-duplicating a subset of the
key set.
Thanks,
Damian

On Mon, 27 Mar 2017 at 13:47 Jon Yeargers  wrote:

> Ive been (re)reading this document(
> http://docs.confluent.io/3.2.0/streams/developer-guide.html#state-stores)
> hoping to better understand StateStores. At the top of the section there is
> a tantalizing note implying that one could do deduplication using a store.
>
> At present we using Redis for this as it gives us a shared location. Ive
> been of the mind that a given store was local to a streams instance. To
> truly support deduplication I would think one would need access to _all_
> the data for a topic and not just on a per-partition basis.
>
> Am I completely misunderstanding this?
>


using a state store for deduplication

2017-03-27 Thread Jon Yeargers
Ive been (re)reading this document(
http://docs.confluent.io/3.2.0/streams/developer-guide.html#state-stores)
hoping to better understand StateStores. At the top of the section there is
a tantalizing note implying that one could do deduplication using a store.

At present we using Redis for this as it gives us a shared location. Ive
been of the mind that a given store was local to a streams instance. To
truly support deduplication I would think one would need access to _all_
the data for a topic and not just on a per-partition basis.

Am I completely misunderstanding this?


Iterating stream windows

2017-03-27 Thread Jon Yeargers
Im hoping to support external queries into a windowed state store
aggregator. Thanks to a previous question here I see where to use a
ReadOnlyWindowStore but Im not clear on how to define the boundaries for
the call.

Assumie I have a one hour window with a 5 minute 'slide' between new
windows. If an arbitrary request for the 'latest' values comes in I want to
return the window thats closest to - but not outside - its 60 minute
boundary. To me this implies I need to iterate over available windows
(those that haven't hit their 'retention' value).

Does such a function exist? I can envision a sol'n using a guava timed
cache but Im trying v hard not to break out of the full-kafka world.


Re: kafka streams in-memory Keyvalue store iterator remove broken on upgrade to 0.10.2.0 from 0.10.1.1

2017-03-27 Thread Thomas Becker
Couldn't this have been solved by returning a ReadOnlyKeyValueIterator
that throws an exception from remove() from the
ReadOnlyKeyValueStore.iterator()? That preserves the ability to call
remove() when it's appropriate and moves the refused bequest to when
you shouldn't.

On Thu, 2017-03-23 at 11:05 -0700, Matthias J. Sax wrote:
> There is a difference between .delete() and it.remove().
>
> .delete() can only be called in a Streams operator that is
> responsible
> to maintain the state. This is of course required to give the
> developer
> writing the operator has full control over the store.
>
> However, it.remove() is called *outside* from the Streams part of
> your
> app. Thus, if a second developer queries a store, she should not be
> able
> to "mess" with the store -- she does not own the store.
>
> Does this make sense?
>
>
> -Matthias
>
>
> On 3/22/17 3:27 PM, Tom Dearman wrote:
> >
> > Hi,
> >
> > What I was trying to accomplish was the normal usage of the
> > iterator
> > interface to enable safe remove while iterating over a collection.
> > I
> > have used iterator.remove since kafka streams was released, so this
> > has been the real functionality since release and in the absence of
> > documentation to say otherwise feels like a bug has been introduced
> > now.  If KeyValueStore#delete doesn't mess up the internal state
> > during the single threaded access to the store I'm not sure why
> > iterator.remove would.j
> > Having said that, I will save the keys for removal during iteration
> > and delete after.
> >
> > Thanks for you help.
> >
> > Tom
> >
> > On 22 March 2017 at 19:34, Michael Noll 
> > wrote:
> > >
> > > To add to what Matthias said, in case the following isn't clear:
> > >
> > > - You should not (and, in 0.10.2, cannot any longer) call the
> > > iterator's
> > > remove() method, i.e. `KeyValueIterator#remove()` when iterating
> > > through a
> > > `KeyValueStore`.  Perhaps this is something we should add to the
> > > `KeyValueIterator` javadocs.
> > >
> > > - You can of course call the store's delete() method:
> > > `KeyValueStore#delete(K key)`.
> > >
> > > Just mentioning this because, when reading the thread quickly, I
> > > missed the
> > > "iterator" part and thought removal/deletion on the store wasn't
> > > working.
> > > ;-)
> > >
> > > Best,
> > > Michael
> > >
> > >
> > >
> > >
> > > On Wed, Mar 22, 2017 at 8:18 PM, Matthias J. Sax  > > ent.io>
> > > wrote:
> > >
> > > >
> > > > Hi,
> > > >
> > > > remove() should not be supported -- thus, it's actually a bug
> > > > in 0.10.1
> > > > that got fixed in 0.10.2.
> > > >
> > > > Stores should only be altered by Streams and iterator over the
> > > > stores
> > > > should be read-only -- otherwise, you might mess up Streams
> > > > internal state.
> > > >
> > > > I would highly recommend to reconsider the call to it.remove()
> > > > in you
> > > > application. Not sure what you try to accomplish, but you
> > > > should do it
> > > > differently.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 3/22/17 8:00 AM, Tom Dearman wrote:
> > > > >
> > > > > Hi, hope someone on kafka-streams team can help.  Our
> > > > > application uses
> > > > >
> > > > > KeyValueIterator it = KeyValueStore.all();
> > > > >
> > > > > …..
> > > > > it.remove()
> > > > >
> > > > >
> > > > > This used to work but is now broken, causes our punctuate to
> > > > > fail and
> > > > StreamThread to die.  The cause seems to be that there were
> > > > changes in
> > > > 0.10.2.0 to InMemoryKeyValueStoreSupplier:
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > public synchronized KeyValueIterator all() {
> > > > > final TreeMap copy = new TreeMap<>(this.map);
> > > > > return new
> > > > > MemoryStoreIterator<>(copy.entrySet().iterator());
> > > > > }
> > > > >
> > > > > @Override
> > > > > public synchronized KeyValueIterator all() {
> > > > > final TreeMap copy = new TreeMap<>(this.map);
> > > > > return new DelegatingPeekingKeyValueIterator<>(name, new
> > > > MemoryStoreIterator<>(copy.entrySet().iterator()));
> > > > >
> > > > > }
> > > > > But the DelegatingPeekingKeyValueIterator has:
> > > > >
> > > > > @Override
> > > > > public void remove() {
> > > > > throw new UnsupportedOperationException("remove not
> > > > > supported");
> > > > > }
> > > > > whereas the old direct call on MemoryStoreIterator allowed
> > > > > remove.  For
> > > > some reason there is no call to underlying.remove() in the
> > > > DelegatingPeekingKeyValueIterator.
> > > > >
> > > > >
> > > > > We don’t want to downgrade to 0.10.1.1 as there was a useful
> > > > > bug fix and
> > > > removing dependancy on zookeeper.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Tom
> > > > >
> > > >
--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copy

Re: kafka streams 0.10.2 failing while restoring offsets

2017-03-27 Thread Damian Guy
Yes, but we don't know why it is still processing the data. We don't want
to have multiple processes acting on the same tasks, hence the exception.
What if for some reason the other task is processing a large backlog, how
long do we wait before we give up?

I think in this case the exception is the right thing to do

On Mon, 27 Mar 2017 at 09:24 Sachin Mittal  wrote:

Hi,
These are the logs
https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0

I think this may not always be the case especially if previous owner is on
a different machine.

Say it is processing and it takes more than the poll timeout to process or
commit the offset.

The group bumps this thread and assigns its task to a different thread on
maybe a different machine.

All this while this client may be pushing the changelog data and other
thread restoring the state from the same partition.

So between the time it starts and it seeks to the end of the changelog it
may be possible that previous thread which was still in process since it
did not know it got bumped out added some more data to that changelog.

Only when previous thread tries to commit the offset it gets to know that
it is no longer the owner of the partition and then issues a rejoin request.

I think in such a case should be handled within streams application.

Thanks
Sachin





On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy  wrote:

> Hi Sachin,
>
> This should not happen. The previous owner of the task should have stopped
> processing before the restoration begins. So if this is happening, then
> that signals a bug. Do you have more logs?
>
> Thanks,
> Damian
>
> On Sat, 25 Mar 2017 at 08:20 Sachin Mittal  wrote:
>
> > Hi,
> > So recently we fixed the deadlock issue and also built the streams jar
by
> > copying the rocks db configs from trunk.
> > So we don't get any deadlock issue now and also we see that the wait
time
> > of CPU cores stays around 5% (down from 50% earlier).
> >
> > However we now get a new exception which is not handled by streams
> > application and causes the instance to shutdown.
> >
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-2] Failed to rebalance
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:622)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of
> > new-part-advice-key-table-changelog-9 should not change while restoring:
> > old end offset 647352, current offset 647632
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> restoreActiveState(ProcessorStateManager.java:240)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:193)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> register(AbstractProcessorContext.java:99)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> init(RocksDBSegmentedBytesStore.java:101)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto
> re.init(ChangeLoggingSegmentedBytesStore.java:68)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> init(MeteredSegmentedBytesStore.java:66)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:76)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:141)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >
> > What I check from logs is this
> > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [StreamThread-2] creating new task 0_9
> > So it creates the task at this time.
> >
> > To create the local state store from the chnagelog topic it starts at
> >
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
> > partition(s): new-part-advice-key-table-changelog-9
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
> > partition new-part-advice-key-table-changelog-9
> > DEBUG 2017-03-25 02:07:24,550 [St

Re: Kafka queue full configuration

2017-03-27 Thread Manikumar
looks like you are referring to scala producer configs.
https://kafka.apache.org/082/documentation.html#producerconfigs

scala producer is deprecated now. It will be removed in future. you can use
java producer
http://kafka.apache.org/documentation/#producerconfigs
buffering related are configs are buffer.memory, block.on.buffer.full

On Mon, Mar 27, 2017 at 3:15 PM, Mohapatra, Sudhir (Nokia - IN/Gurgaon) <
sudhir.mohapa...@nokia.com> wrote:

> Can you please let me know what is the new parameter name for the same
> functionality to simulate the queue full scenario?
>
> Regards,
> Sudhir
>
> From: Mohapatra, Sudhir (Nokia - IN/Gurgaon)
> Sent: Thursday, March 23, 2017 11:01 AM
> To: 'users@kafka.apache.org' ; '
> d...@kafka.apache.org' 
> Subject: Kafka queue full configuration
>
> Hi,
> We are trying to simulate the kafka queue full scenarios on kafka 0.10.0.
> I have seen in earlier versions there is a configuration parameter
> "queue.buffering.max.messages" which can be set to simulate the queue full
> scenario.
> But in the kafka 0.10.0 this parameter is not there.
> https://kafka.apache.org/0100/documentation.html
>
> Is this "queue.buffering.max.messages" config parameter has been changed
> in kafka 0.10.0 release?
> Can you please let me know what is the new parameter name for the same
> functionality?
>
> Regards,
> Sudhir
>
>


RE: Kafka queue full configuration

2017-03-27 Thread Mohapatra, Sudhir (Nokia - IN/Gurgaon)
Can you please let me know what is the new parameter name for the same 
functionality to simulate the queue full scenario?

Regards,
Sudhir

From: Mohapatra, Sudhir (Nokia - IN/Gurgaon)
Sent: Thursday, March 23, 2017 11:01 AM
To: 'users@kafka.apache.org' ; 'd...@kafka.apache.org' 

Subject: Kafka queue full configuration

Hi,
We are trying to simulate the kafka queue full scenarios on kafka 0.10.0.
I have seen in earlier versions there is a configuration parameter 
"queue.buffering.max.messages" which can be set to simulate the queue full 
scenario.
But in the kafka 0.10.0 this parameter is not there.
https://kafka.apache.org/0100/documentation.html

Is this "queue.buffering.max.messages" config parameter has been changed in 
kafka 0.10.0 release?
Can you please let me know what is the new parameter name for the same 
functionality?

Regards,
Sudhir



offset commitment from another client

2017-03-27 Thread Vova Shelgunov
Hi,

I have an application which consumes messages from Kafka, then it creates a
Docker container via Mesos which processes incoming message (image), but I
need to commit an offset only once message is processed inside a Docker
container. So basically I need to commit offset from another broker (that
is running in a container).

Will it work?

Thanks


Re: kafka streams 0.10.2 failing while restoring offsets

2017-03-27 Thread Sachin Mittal
Hi,
These are the logs
https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0

I think this may not always be the case especially if previous owner is on
a different machine.

Say it is processing and it takes more than the poll timeout to process or
commit the offset.

The group bumps this thread and assigns its task to a different thread on
maybe a different machine.

All this while this client may be pushing the changelog data and other
thread restoring the state from the same partition.

So between the time it starts and it seeks to the end of the changelog it
may be possible that previous thread which was still in process since it
did not know it got bumped out added some more data to that changelog.

Only when previous thread tries to commit the offset it gets to know that
it is no longer the owner of the partition and then issues a rejoin request.

I think in such a case should be handled within streams application.

Thanks
Sachin





On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy  wrote:

> Hi Sachin,
>
> This should not happen. The previous owner of the task should have stopped
> processing before the restoration begins. So if this is happening, then
> that signals a bug. Do you have more logs?
>
> Thanks,
> Damian
>
> On Sat, 25 Mar 2017 at 08:20 Sachin Mittal  wrote:
>
> > Hi,
> > So recently we fixed the deadlock issue and also built the streams jar by
> > copying the rocks db configs from trunk.
> > So we don't get any deadlock issue now and also we see that the wait time
> > of CPU cores stays around 5% (down from 50% earlier).
> >
> > However we now get a new exception which is not handled by streams
> > application and causes the instance to shutdown.
> >
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-2] Failed to rebalance
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:622)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of
> > new-part-advice-key-table-changelog-9 should not change while restoring:
> > old end offset 647352, current offset 647632
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> restoreActiveState(ProcessorStateManager.java:240)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:193)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> register(AbstractProcessorContext.java:99)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> init(RocksDBSegmentedBytesStore.java:101)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto
> re.init(ChangeLoggingSegmentedBytesStore.java:68)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> init(MeteredSegmentedBytesStore.java:66)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:76)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:141)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >
> > What I check from logs is this
> > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [StreamThread-2] creating new task 0_9
> > So it creates the task at this time.
> >
> > To create the local state store from the chnagelog topic it starts at
> >
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
> > partition(s): new-part-advice-key-table-changelog-9
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
> > partition new-part-advice-key-table-changelog-9
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset
> for
> > partition new-part-advice-key-table-changelog-9 to latest offset.
> > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.internals.Fetcher - Handling
> > ListOffsetResponse response for new-part-advice-key-table-changelog-9.
> > Fetched

Re: YASSQ (yet another state store question)

2017-03-27 Thread Damian Guy
Hi Jon,
The store you have created is a window store, so you need to use:

kafkaStreams.store("AggStore",* QueryableStoreTypes.windowStore()*)

Thanks,
Damian

On Sun, 26 Mar 2017 at 14:14, Jon Yeargers  wrote:

Also - if I run this on two hosts - what does it imply if the response to
'streams.allMetadata()' from one host includes both instances but the other
host only knows about itself?

On Sun, Mar 26, 2017 at 5:58 AM, Jon Yeargers 
wrote:

> If the '.state()' function returns "RUNNING" and I still get this
> exception?
>
> On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska 
> wrote:
>
>> Hi Jon,
>>
>> This is expected, see this: https://groups.google.com/foru
>> m/?pli=1#!searchin/confluent-platform/migrated$20to$
>> 20another$20instance%7Csort:relevance/confluent-platform/
>> LglWC_dZDKw/qsPuCRT_DQAJ > um/?pli=1#!searchin/confluent-platform/migrated$20to$
>> 20another$20instance|sort:relevance/confluent-platform/
>> LglWC_dZDKw/qsPuCRT_DQAJ>.
>>
>> Thanks
>> Eno
>> > On 24 Mar 2017, at 20:51, Jon Yeargers 
>> wrote:
>> >
>> > I've setup a KTable as follows:
>> >
>> > KTable, String> outTable = sourceStream.groupByKey().
>> > reduce(rowReducer,
>> >TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
>> > 1000).until(10 * 60 * 1000L),
>> >"AggStore");
>> >
>> > I can confirm its presence via 'streams.allMetadata()' (accessible
>> through
>> > a simple httpserver).
>> >
>> > When I call 'ReadOnlyKeyValueStore store =
>> > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'
>> >
>> > I get this exception:
>> >
>> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
>> > store, AggStore, may have migrated to another instance.
>> >at
>> > org.apache.kafka.streams.state.internals.QueryableStoreProvi
>> der.getStore(QueryableStoreProvider.java:49)
>> >at
>> > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
>> >at
>> > com.cedexis.videokafka.videohouraggregator.RequestHandler.
>> handle(RequestHandler.java:97)
>> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>> >at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
>> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
>> >at
>> > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Se
>> rverImpl.java:675)
>> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>> >at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:6
>> 47)
>> >at
>> > sun.net.httpserver.ServerImpl$DefaultExecutor.execute(Server
>> Impl.java:158)
>> >at
>> > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
>> >at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java
>> :396)
>> >at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > ... except.. there is only one instance.. running locally.
>>
>>
>


Re: kafka streams 0.10.2 failing while restoring offsets

2017-03-27 Thread Damian Guy
Hi Sachin,

This should not happen. The previous owner of the task should have stopped
processing before the restoration begins. So if this is happening, then
that signals a bug. Do you have more logs?

Thanks,
Damian

On Sat, 25 Mar 2017 at 08:20 Sachin Mittal  wrote:

> Hi,
> So recently we fixed the deadlock issue and also built the streams jar by
> copying the rocks db configs from trunk.
> So we don't get any deadlock issue now and also we see that the wait time
> of CPU cores stays around 5% (down from 50% earlier).
>
> However we now get a new exception which is not handled by streams
> application and causes the instance to shutdown.
>
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-2] Failed to rebalance
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:622)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of
> new-part-advice-key-table-changelog-9 should not change while restoring:
> old end offset 647352, current offset 647632
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:240)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:76)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>
> What I check from logs is this
> DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-2] creating new task 0_9
> So it creates the task at this time.
>
> To create the local state store from the chnagelog topic it starts at
>
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
> partition(s): new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
> partition new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
> partition new-part-advice-key-table-changelog-9 to latest offset.
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Handling
> ListOffsetResponse response for new-part-advice-key-table-changelog-9.
> Fetched offset 647352, timestamp -1
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning of
> partition new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
> partition new-part-advice-key-table-changelog-9 to earliest offset.
>
> and process is over at
> DEBUG 2017-03-25 02:10:21,225 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch for
> partitions [new-part-advice-key-table-changelog-9] to broker
> 192.168.73.199:9092 (id: 5 rack: null)
> DEBUG 2017-03-25 02:10:21,230 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all topics
> or patterns and assigned partitions
>
> And the exception is thrown at:
> ERROR 2017-03-25 02:10:21,232 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User
> provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> new-part-advice failed on partition assignment
> java.lang.IllegalStateException: task [0_9] Log end offset of
> new-part-advice-key-table-changelog-9 should not change 

Re: YASSQ (yet another state store question)

2017-03-27 Thread Michael Noll
IIRC this may happen, for example, if the first host runs all the stream
tasks (here: 2 in total) and migration of stream task(s) to the second host
hasn't happened yet.

-Michael



On Sun, Mar 26, 2017 at 3:14 PM, Jon Yeargers 
wrote:

> Also - if I run this on two hosts - what does it imply if the response to
> 'streams.allMetadata()' from one host includes both instances but the other
> host only knows about itself?
>
> On Sun, Mar 26, 2017 at 5:58 AM, Jon Yeargers 
> wrote:
>
> > If the '.state()' function returns "RUNNING" and I still get this
> > exception?
> >
> > On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska 
> > wrote:
> >
> >> Hi Jon,
> >>
> >> This is expected, see this: https://groups.google.com/foru
> >> m/?pli=1#!searchin/confluent-platform/migrated$20to$
> >> 20another$20instance%7Csort:relevance/confluent-platform/
> >> LglWC_dZDKw/qsPuCRT_DQAJ  >> um/?pli=1#!searchin/confluent-platform/migrated$20to$
> >> 20another$20instance|sort:relevance/confluent-platform/
> >> LglWC_dZDKw/qsPuCRT_DQAJ>.
> >>
> >> Thanks
> >> Eno
> >> > On 24 Mar 2017, at 20:51, Jon Yeargers 
> >> wrote:
> >> >
> >> > I've setup a KTable as follows:
> >> >
> >> > KTable, String> outTable = sourceStream.groupByKey().
> >> > reduce(rowReducer,
> >> >TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
> >> > 1000).until(10 * 60 * 1000L),
> >> >"AggStore");
> >> >
> >> > I can confirm its presence via 'streams.allMetadata()' (accessible
> >> through
> >> > a simple httpserver).
> >> >
> >> > When I call 'ReadOnlyKeyValueStore store =
> >> > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'
> >> >
> >> > I get this exception:
> >> >
> >> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> >> > store, AggStore, may have migrated to another instance.
> >> >at
> >> > org.apache.kafka.streams.state.internals.QueryableStoreProvi
> >> der.getStore(QueryableStoreProvider.java:49)
> >> >at
> >> > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
> >> >at
> >> > com.cedexis.videokafka.videohouraggregator.RequestHandler.
> >> handle(RequestHandler.java:97)
> >> >at com.sun.net.httpserver.Filter$
> Chain.doFilter(Filter.java:79)
> >> >at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
> >> >at com.sun.net.httpserver.Filter$
> Chain.doFilter(Filter.java:82)
> >> >at
> >> > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Se
> >> rverImpl.java:675)
> >> >at com.sun.net.httpserver.Filter$
> Chain.doFilter(Filter.java:79)
> >> >at sun.net.httpserver.ServerImpl$
> Exchange.run(ServerImpl.java:6
> >> 47)
> >> >at
> >> > sun.net.httpserver.ServerImpl$DefaultExecutor.execute(Server
> >> Impl.java:158)
> >> >at
> >> > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
> >> >at sun.net.httpserver.ServerImpl$
> Dispatcher.run(ServerImpl.java
> >> :396)
> >> >at java.lang.Thread.run(Thread.java:745)
> >> >
> >> >
> >> > ... except.. there is only one instance.. running locally.
> >>
> >>
> >
>


Re: Streams RocksDBException with no message?

2017-03-27 Thread Michael Noll
We're talking about `ulimit` (CLI tool) and the `nofile` limit (number of
open files), which you can access via `ulimit -n`.

Examples:
https://access.redhat.com/solutions/61334
https://stackoverflow.com/questions/21515463/how-to-increase-maximum-file-open-limit-ulimit-in-ubuntu

Depending on the operating system, the default setting is often pretty low
(e.g. 1024).  Bump this up to sth higher, like 16k or 32k.  Of course, an
even better approach is to monitor this metric on your servers/brokers, and
-- with this collected information -- bump the setting to a reasonable
value for your environment.

-Michael



On Sun, Mar 26, 2017 at 7:41 PM, Sachin Mittal  wrote:

> Hi,
> Could you please tell us what did you change for ulimit and how.
>
> We also are seem to be facing same issue.
>
> Thanks
> Sachin
>
>
> On Tue, Mar 21, 2017 at 9:22 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Thanks Guozhang.
> >
> > For my part, turns out I was hitting ulimit on my open file descriptors.
> > Phew, easy to fix... once I figured it out. :-)
> >
> > Mathieu
> >
> >
> > On Fri, Mar 17, 2017 at 4:14 PM, Guozhang Wang 
> wrote:
> >
> > > Hi Mathieu,
> > >
> > > We are aware of that since long time ago and I have been looking into
> > this
> > > issue, turns out to be a known issue in RocksDB:
> > >
> > > https://github.com/facebook/rocksdb/issues/1688
> > >
> > > And the corresponding fix (https://github.com/facebook/
> rocksdb/pull/1714
> > )
> > > has been merged in master but marked for
> > >
> > >- v5.1.4 
> > >
> > > only while the latest release is 5.1.2.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Mar 17, 2017 at 10:27 AM, Mathieu Fenniak <
> > > mathieu.fenn...@replicon.com> wrote:
> > >
> > > > Hey all,
> > > >
> > > > So... what does it mean to have a RocksDBException with a message
> that
> > > just
> > > > has a single character?  "e", "q", "]"... I've seen a few.  Has
> anyone
> > > seen
> > > > this before?
> > > >
> > > > Two example exceptions:
> > > > https://gist.github.com/mfenniak/c56beb6d5058e2b21df0309aea224f12
> > > >
> > > > Kafka Streams 0.10.2.0.  Both of these errors occurred during state
> > store
> > > > initialization.  I'm running a single Kafka Streams thread per
> server,
> > > this
> > > > occurred on two servers about a half-hour apart.
> > > >
> > > > Mathieu
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>