Re: KafkaStream: puncutuate() never called even when data is received by process()

2016-11-23 Thread shahab
Thanks for the comments.
@David: yes, I have a source which is reading data from two topics and one
of them were empty while the second one was loaded with plenty of data.
So what do you suggest to solve this ?
Here is snippet of my code:

StreamsConfig config = new StreamsConfig(configProperties);
TopologyBuilder builder = new TopologyBuilder();
AppSettingsFetcher appSettingsFetcher = initAppSettingsFetcher();

StateStoreSupplier company_bucket= Stores.create("CBS")
.withKeys(Serdes.String())
.withValues(Serdes.String())
.persistent()
.build();

StateStoreSupplier profiles= Stores.create("PR")
.withKeys(Serdes.String())
.withValues(Serdes.String())
.persistent()
.build();


builder
.addSource("deltaSource", topicName, LoaderListener.LoadedDeltaTopic)

.addProcessor("deltaProcess1", () -> new DeltaProcessor(

), "deltaSource")
.addProcessor("deltaProcess2", () -> new LoadProcessor(

), "deltaProcess1")
.addStateStore(profiles, "deltaProcess2", "deltaProcess1")
.addStateStore(company_bucket, "deltaProcess2", "deltaProcess1");

KafkaStreams streams = new KafkaStreams(builder, config);

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
e.printStackTrace();
}
});

streams.start();


On Wed, Nov 23, 2016 at 8:30 PM, David Garcia  wrote:

> If you are consuming from more than one topic/partition, punctuate is
> triggered when the “smallest” time-value changes.  So, if there is a
> partition that doesn’t have any more messages on it, it will always have
> the smallest time-value and that time value won’t change…hence punctuate
> never gets called.
>
> -David
>
> On 11/23/16, 1:01 PM, "Matthias J. Sax"  wrote:
>
> Your understanding is correct:
>
> Punctuate is not triggered base on wall-clock time, but based in
> internally tracked "stream time" that is derived from
> TimestampExtractor.
> Even if you use WallclockTimestampExtractor, "stream time" is only
> advance if there are input records.
>
> Not sure why punctuate() is not triggered as you say that you do have
> arriving data.
>
> Can you share your code?
>
>
>
> -Matthias
>
>
> On 11/23/16 4:48 AM, shahab wrote:
> > Hello,
> >
> > I am using low level processor and I set the context.schedule(1),
> > assuming that punctuate() method is invoked every 10 sec .
> >  I have set
> > configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > WallclockTimestampExtractor.class.getCanonicalName()) )
> >
> > Although data is keep coming to the topology (as I have logged the
> incoming
> > tuples to process() ),  punctuate() is never executed.
> >
> > What I am missing?
> >
> > best,
> > Shahab
> >
>
>
>
>


Data (re)processing with Kafka (new wiki page)

2016-11-23 Thread Matthias J. Sax
Hi,

we added a new wiki page that is supposed to collect data (re)processing
scenario with Kafka:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Data+(Re)Processing+Scenarios

We added already a couple of scenarios we think might be common and want
to invite all of you to add more. This helps to get a better overview of
requirements to enable new use cases.

We are looking forward to your feedback!


-Matthias



signature.asc
Description: OpenPGP digital signature


Re: Oversized Message 40k

2016-11-23 Thread Felipe Santos
Thanks guys, for your information, I will do some performance tests

Em qua, 23 de nov de 2016 às 05:14, Ignacio Solis 
escreveu:

> At LinkedIn we have a number of use cases for large messages.  We stick to
> the 1MB message limit at the high end though.
>
> Nacho
>
> On Tue, Nov 22, 2016 at 6:11 PM, Gwen Shapira  wrote:
>
> > This has been our experience as well. I think the largest we've seen
> > in production is 50MB.
> >
> > If you have performance numbers you can share for the large messages,
> > I think we'll all appreciate :)
> >
> > On Tue, Nov 22, 2016 at 1:04 PM, Tauzell, Dave
> >  wrote:
> > > I ran tests with a mix of messages, some as large as 20MB.   These
> large
> > messages do slow down processing, but it still works.
> > >
> > > -Dave
> > >
> > > -Original Message-
> > > From: h...@confluent.io [mailto:h...@confluent.io]
> > > Sent: Tuesday, November 22, 2016 1:41 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: Oversized Message 40k
> > >
> > > The default config handles messages up to 1MB so you should be fine.
> > >
> > > -hans
> > >
> > >> On Nov 22, 2016, at 4:00 AM, Felipe Santos 
> wrote:
> > >>
> > >> I read on documentation that kafka is not optimized for big messages,
> > >> what is considered a big message?
> > >>
> > >> For us the messages will be on average from 20k ~ 40k? Is this a real
> > >> problem?
> > >>
> > >> Thanks
> > >> --
> > >> Felipe Santos
> > > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use of the
> > individual or entity to whom they are addressed. If you have received
> this
> > e-mail in error, please notify the sender by reply e-mail immediately and
> > destroy all copies of the e-mail and any attachments.
> >
> >
> >
> > --
> > Gwen Shapira
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>
>
>
> --
> Nacho - Ignacio Solis - iso...@igso.net
>


Re: KafkaStream: puncutuate() never called even when data is received by process()

2016-11-23 Thread David Garcia
If you are consuming from more than one topic/partition, punctuate is triggered 
when the “smallest” time-value changes.  So, if there is a partition that 
doesn’t have any more messages on it, it will always have the smallest 
time-value and that time value won’t change…hence punctuate never gets called.

-David

On 11/23/16, 1:01 PM, "Matthias J. Sax"  wrote:

Your understanding is correct:

Punctuate is not triggered base on wall-clock time, but based in
internally tracked "stream time" that is derived from TimestampExtractor.
Even if you use WallclockTimestampExtractor, "stream time" is only
advance if there are input records.

Not sure why punctuate() is not triggered as you say that you do have
arriving data.

Can you share your code?



-Matthias


On 11/23/16 4:48 AM, shahab wrote:
> Hello,
> 
> I am using low level processor and I set the context.schedule(1),
> assuming that punctuate() method is invoked every 10 sec .
>  I have set
> configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class.getCanonicalName()) )
> 
> Although data is keep coming to the topology (as I have logged the 
incoming
> tuples to process() ),  punctuate() is never executed.
> 
> What I am missing?
> 
> best,
> Shahab
> 





Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-23 Thread Matthias J. Sax
CACHE_MAX_BYTES_BUFFERING_CONFIG does not have any impact if you query
the state. If you query it, you will always get the latest values.
CACHE_MAX_BYTES_BUFFERING_CONFIG only effects the downstream KTable
changelog stream (but you do not use this anyway).

If I understand you correctly, if you remove the map() you get three
count results <"A-1":1>, <"A-2":1>, and <"B-1":1>

Are you sure, that you map is correct? I am not familiar with Scala, but
the key and value must both not be null to be included in the count.
Could it be that `k.split("-")(0)` returns a null key?

-Matthias

On 11/23/16 7:00 AM, Hamidreza Afzali wrote:
> Thanks Matthias.
> 
> Disabling the cache didn't solve the issue. Here's a sample code:
> 
> https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13
> 
> The topology doesn't produce any result but it works when commenting out 
> .map(...) in line 21.
> 
> Thanks,
> Hamid
> 



signature.asc
Description: OpenPGP digital signature


Re: KafkaStream: puncutuate() never called even when data is received by process()

2016-11-23 Thread Matthias J. Sax
Your understanding is correct:

Punctuate is not triggered base on wall-clock time, but based in
internally tracked "stream time" that is derived from TimestampExtractor.
Even if you use WallclockTimestampExtractor, "stream time" is only
advance if there are input records.

Not sure why punctuate() is not triggered as you say that you do have
arriving data.

Can you share your code?



-Matthias


On 11/23/16 4:48 AM, shahab wrote:
> Hello,
> 
> I am using low level processor and I set the context.schedule(1),
> assuming that punctuate() method is invoked every 10 sec .
>  I have set
> configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class.getCanonicalName()) )
> 
> Although data is keep coming to the topology (as I have logged the incoming
> tuples to process() ),  punctuate() is never executed.
> 
> What I am missing?
> 
> best,
> Shahab
> 



signature.asc
Description: OpenPGP digital signature


Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Matthias J. Sax
> 1) Create a state store AND the changelog
> topic 2) follow the Kafka Streams naming convention for changelog topics.
> Basically, I want to have a method that does what .through() is supposed to
> do according to the documentation, but without the "topic" parameter.

I understand what you are saying, but you can get this done right now,
too. If you use through(...) you will get the store. And you can just
specify the topic name as "applicationId-storeName-changelog" to follow
the naming convention Streams used internally. What is the problem using
this approach (besides that you have to provide the topic name which
seems not to be a big burden to me?)


-Matthias


On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> Hi Michael,
> 
> thanks for the extensive explanation, and yes it definitely helps with my
> understanding of through(). :)
> 
> You guessed correctly that I'm doing some "shenanings" where I'm trying to
> derive the changelog of a state store from the state store name. This works
> perfectly fine with with a naming convention for the topics and by creating
> them in Kafka upfront.
> 
> My point is that it would help me (and maybe others), if the API of KTable
> was extended to have a new method that does two things that is not part of
> the implementation of .through(). 1) Create a state store AND the changelog
> topic 2) follow the Kafka Streams naming convention for changelog topics.
> Basically, I want to have a method that does what .through() is supposed to
> do according to the documentation, but without the "topic" parameter.
> 
> What do you think, would it be possible to extend the API with a method
> like that?
> 
> Thanks,
> Mikael
> 
> On Wed, Nov 23, 2016 at 4:16 PM Michael Noll  wrote:
> 
>> Mikael,
>>
>> regarding your second question:
>>
>>> 2) Regarding the use case, the topology looks like this:
>>>
>>> .stream(...)
>>> .aggregate(..., "store-1")
>>> .mapValues(...)
>>> .through(..., "store-2")
>>
>> The last operator above would, without "..." ellipsis, be sth like
>> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
>> changelog topic for both the KTable and the state store "store-2".  So this
>> is the changelog topic name that you want to know.
>>
>> - If you want the "through" topic to have a `-changelog` suffix, then you'd
>> need to add that yourself in the call to `through(...)`.
>>
>> - If you wonder why `through()` doesn't add a `-changelog` suffix
>> automatically:  That's because `through()` -- like `to()` or `stream()`,
>> `table()` -- require you to explicitly provide a topic name, and of course
>> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
>> added when Kafka creates internal changelog topics behind the scenes for
>> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
>> because it refers to `-changelog`;  we'll fix that as mentioned above.
>>
>> - Also, in case you want to do some shenanigans (like for some tooling
>> you're building around state stores/changelogs/interactive queries) such
>> detecting all state store changelogs by doing the equivalent of `ls
>> *-changelog`, then this will miss changelogs of KTables that are created by
>> `through()` and `to()` (unless you come up with a naming convention that
>> your tooling can assume to be in place, e.g. by always adding `-changelog`
>> to topic names when you call `through()`).
>>
>> I hope this helps!
>> Michael
>>
>>
>>
>>
>> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
>> wrote:
>>
>>> Hi Eno,
>>>
>>> 1) Great :)
>>>
>>> 2) Yes, we are using the Interactive Queries to access the state stores.
>> In
>>> addition, we access the changelogs to subscribe to updates. For this
>> reason
>>> we need to know the changelog topic name.
>>>
>>> Thanks,
>>> Mikael
>>>
>>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
>>> wrote:
>>>
 HI Mikael,

 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
>> looking
 into fixing it. I agree that it can be confusing to have topic names
>> that
 are not what one would expect.

 2) If your goal is to query/read from the state stores, you can use
 Interactive Queries to do that (you don't need to worry about the
>>> changelog
 topic name and such). Interactive Queries is a new feature in 0.10.1
>>> (blog
 here:
 https://www.confluent.io/blog/unifying-stream-processing-
>>> and-interactive-queries-in-apache-kafka/
 <
 https://www.confluent.io/blog/unifying-stream-processing-
>>> and-interactive-queries-in-apache-kafka/
> ).

 Thanks
 Eno


> On 22 Nov 2016, at 19:27, Mikael Högqvist 
>> wrote:
>
> Sorry for being unclear, i'll try again :)
>
> 1) The JavaDoc for through is not correct, it states that a changelog
 topic
> will be created for the state store. That is, if I would call it with
> 

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Mikael Högqvist
Hi Michael,

thanks for the extensive explanation, and yes it definitely helps with my
understanding of through(). :)

You guessed correctly that I'm doing some "shenanings" where I'm trying to
derive the changelog of a state store from the state store name. This works
perfectly fine with with a naming convention for the topics and by creating
them in Kafka upfront.

My point is that it would help me (and maybe others), if the API of KTable
was extended to have a new method that does two things that is not part of
the implementation of .through(). 1) Create a state store AND the changelog
topic 2) follow the Kafka Streams naming convention for changelog topics.
Basically, I want to have a method that does what .through() is supposed to
do according to the documentation, but without the "topic" parameter.

What do you think, would it be possible to extend the API with a method
like that?

Thanks,
Mikael

On Wed, Nov 23, 2016 at 4:16 PM Michael Noll  wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
> changelog topic for both the KTable and the state store "store-2".  So this
> is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then you'd
> need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
> wrote:
>
> > Hi Eno,
> >
> > 1) Great :)
> >
> > 2) Yes, we are using the Interactive Queries to access the state stores.
> In
> > addition, we access the changelogs to subscribe to updates. For this
> reason
> > we need to know the changelog topic name.
> >
> > Thanks,
> > Mikael
> >
> > On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
> > wrote:
> >
> > > HI Mikael,
> > >
> > > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> looking
> > > into fixing it. I agree that it can be confusing to have topic names
> that
> > > are not what one would expect.
> > >
> > > 2) If your goal is to query/read from the state stores, you can use
> > > Interactive Queries to do that (you don't need to worry about the
> > changelog
> > > topic name and such). Interactive Queries is a new feature in 0.10.1
> > (blog
> > > here:
> > > https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/
> > > <
> > > https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/
> > > >).
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On 22 Nov 2016, at 19:27, Mikael Högqvist 
> wrote:
> > > >
> > > > Sorry for being unclear, i'll try again :)
> > > >
> > > > 1) The JavaDoc for through is not correct, it states that a changelog
> > > topic
> > > > will be created for the state store. That is, if I would call it with
> > > > through("topic", "a-store"), I would expect a kafka topic
> > > > "my-app-id-a-store-changelog" to be created.
> > > >
> > > > 2) Regarding the use case, the topology looks like this:
> > > >
> > > > .stream(...)
> > > > .aggregate(..., "store-1")
> > > > .mapValues(...)
> > > > .through(..., "store-2")
> > > >
> > > > Basically, I want to materialize both the result from the aggregate
> > > method
> > > > and the result from mapValues, which is materialized using
> .through().
> > > > Later, I will access both the tables (store-1 and store-2) to a) get
> > the
> > > > current state of the aggregate, b) subscribe to future updates. This
> > > works
> > > > just fine. The only issue is that I assumed to have a changelog topic
> > for
> > > > store-2 created automatically, 

A strange controller log in Kafka 0.9.0.1

2016-11-23 Thread Json Tu
Hi,
We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a strange 
controller log as below.

[2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK expired; 
shut down all controller components and try to re-elect 
(kafka.controller.KafkaController$SessionExpirationListener)
[2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning, broker 
id 100 (kafka.controller.KafkaController)
[2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering 
IsrChangeNotificationListener (kafka.controller.KafkaController)
[2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down 
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped  
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown completed 
(kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-11-07 03:14:48,580] INFO [Partition state machine on Controller 100]: 
Stopped partition state machine (kafka.controller.PartitionStateMachine)
[2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]: 
Stopped replica state machine (kafka.controller.ReplicaStateMachine)
[2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread], 
Shutting down (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
Stopped  (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
Shutdown completed (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread], 
Shutting down (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
Stopped  (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
Shutdown completed (kafka.controller.RequestSendThread)
[2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as the 
controller (kafka.controller.KafkaController)
[2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!! 
(kafka.controller.IsrChangeNotificationListener)
[2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]: Broker 
change listener fired for path /brokers/ids with children 101,100 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete topics 
listener fired for topics  to be deleted 
(kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add Partition 
triggered 
{"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
 for path /brokers/topics/movie.gateway.merselllog.syncCinema 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)
[2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add Partition 
triggered 
{"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
 for path /brokers/topics/push_3rdparty_high 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)
[2016-11-07 03:14:48,707] INFO [AddPartitionsListener on 100]: Add Partition 
triggered 
{"version":1,"partitions":{"4":[101,102],"5":[102,100],"1":[101,100],"0":[100,102],"2":[102,101],"3":[100,101]}}
 for path /brokers/topics/icb_msg_push_high_02 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)
[2016-11-07 03:14:48,715] INFO [AddPartitionsListener on 100]: Add Partition 
triggered 
{"version":1,"partitions":{"4":[102,100],"5":[100,101],"1":[102,101],"0":[101,100],"2":[100,102],"3":[101,102]}}
 for path /brokers/topics/movie.gateway.merselllog.unlockSeat 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)


From the log we can see that old controller 100 resigned as the 
controller successfully,but what confused me is that it can also receive 
Fired!!! from IsrChangeNotificationListener which have beed de-register before,
and we can see broker 100 not elect as new controller next time. but we can see 
IsrChangeNotificationListener、DeleteTopicsListener、AddPartitionsListener all 
fired after ressign,does it seems something run with zookeeper.
Any suggestion is appreciated, thanks in advance.





Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Michael Noll
> - Also, in case you want to do some shenanigans (like for some tooling
you're building
> around state stores/changelogs/interactive queries) such detecting all
state store changelogs
> by doing the equivalent of `ls *-changelog`, then this will miss
changelogs of KTables that are
> created by `through()` and `to()` [...]

Addendum: And that's because the topic that is created by
`KTable#through()` and `KTable#to()` is, by definition, a changelog of that
KTable and the associated state store.



On Wed, Nov 23, 2016 at 4:15 PM, Michael Noll  wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is
> the changelog topic for both the KTable and the state store "store-2".  So
> this is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then
> you'd need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
> wrote:
>
>> Hi Eno,
>>
>> 1) Great :)
>>
>> 2) Yes, we are using the Interactive Queries to access the state stores.
>> In
>> addition, we access the changelogs to subscribe to updates. For this
>> reason
>> we need to know the changelog topic name.
>>
>> Thanks,
>> Mikael
>>
>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
>> wrote:
>>
>> > HI Mikael,
>> >
>> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
>> looking
>> > into fixing it. I agree that it can be confusing to have topic names
>> that
>> > are not what one would expect.
>> >
>> > 2) If your goal is to query/read from the state stores, you can use
>> > Interactive Queries to do that (you don't need to worry about the
>> changelog
>> > topic name and such). Interactive Queries is a new feature in 0.10.1
>> (blog
>> > here:
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > <
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > >).
>> >
>> > Thanks
>> > Eno
>> >
>> >
>> > > On 22 Nov 2016, at 19:27, Mikael Högqvist 
>> wrote:
>> > >
>> > > Sorry for being unclear, i'll try again :)
>> > >
>> > > 1) The JavaDoc for through is not correct, it states that a changelog
>> > topic
>> > > will be created for the state store. That is, if I would call it with
>> > > through("topic", "a-store"), I would expect a kafka topic
>> > > "my-app-id-a-store-changelog" to be created.
>> > >
>> > > 2) Regarding the use case, the topology looks like this:
>> > >
>> > > .stream(...)
>> > > .aggregate(..., "store-1")
>> > > .mapValues(...)
>> > > .through(..., "store-2")
>> > >
>> > > Basically, I want to materialize both the result from the aggregate
>> > method
>> > > and the result from mapValues, which is materialized using .through().
>> > > Later, I will access both the tables (store-1 and store-2) to a) get
>> the
>> > > current state of the aggregate, b) subscribe to future updates. This
>> > works
>> > > just fine. The only issue is that I assumed to have a changelog topic
>> for
>> > > store-2 created automatically, which didnt happen.
>> > >
>> > > Since I want to access the changelog topic, it helps if the naming is
>> > > consistent. So either we enforce the same naming pattern as kafka when
>> > > calling .through() or alternatively the Kafka Streams API can provide
>> a
>> > > method to materialize tables which creates a topic name according to
>> the
>> > > naming pattern. E.g. .through() without the topic parameter.
>> > >
>> > > What do you think?
>> > >
>> > > Best,
>> > > 

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Michael Noll
Mikael,

regarding your second question:

> 2) Regarding the use case, the topology looks like this:
>
> .stream(...)
> .aggregate(..., "store-1")
> .mapValues(...)
> .through(..., "store-2")

The last operator above would, without "..." ellipsis, be sth like
`KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
changelog topic for both the KTable and the state store "store-2".  So this
is the changelog topic name that you want to know.

- If you want the "through" topic to have a `-changelog` suffix, then you'd
need to add that yourself in the call to `through(...)`.

- If you wonder why `through()` doesn't add a `-changelog` suffix
automatically:  That's because `through()` -- like `to()` or `stream()`,
`table()` -- require you to explicitly provide a topic name, and of course
Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
added when Kafka creates internal changelog topics behind the scenes for
you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
because it refers to `-changelog`;  we'll fix that as mentioned above.

- Also, in case you want to do some shenanigans (like for some tooling
you're building around state stores/changelogs/interactive queries) such
detecting all state store changelogs by doing the equivalent of `ls
*-changelog`, then this will miss changelogs of KTables that are created by
`through()` and `to()` (unless you come up with a naming convention that
your tooling can assume to be in place, e.g. by always adding `-changelog`
to topic names when you call `through()`).

I hope this helps!
Michael




On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
wrote:

> Hi Eno,
>
> 1) Great :)
>
> 2) Yes, we are using the Interactive Queries to access the state stores. In
> addition, we access the changelogs to subscribe to updates. For this reason
> we need to know the changelog topic name.
>
> Thanks,
> Mikael
>
> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
> wrote:
>
> > HI Mikael,
> >
> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking
> > into fixing it. I agree that it can be confusing to have topic names that
> > are not what one would expect.
> >
> > 2) If your goal is to query/read from the state stores, you can use
> > Interactive Queries to do that (you don't need to worry about the
> changelog
> > topic name and such). Interactive Queries is a new feature in 0.10.1
> (blog
> > here:
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > <
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > >).
> >
> > Thanks
> > Eno
> >
> >
> > > On 22 Nov 2016, at 19:27, Mikael Högqvist  wrote:
> > >
> > > Sorry for being unclear, i'll try again :)
> > >
> > > 1) The JavaDoc for through is not correct, it states that a changelog
> > topic
> > > will be created for the state store. That is, if I would call it with
> > > through("topic", "a-store"), I would expect a kafka topic
> > > "my-app-id-a-store-changelog" to be created.
> > >
> > > 2) Regarding the use case, the topology looks like this:
> > >
> > > .stream(...)
> > > .aggregate(..., "store-1")
> > > .mapValues(...)
> > > .through(..., "store-2")
> > >
> > > Basically, I want to materialize both the result from the aggregate
> > method
> > > and the result from mapValues, which is materialized using .through().
> > > Later, I will access both the tables (store-1 and store-2) to a) get
> the
> > > current state of the aggregate, b) subscribe to future updates. This
> > works
> > > just fine. The only issue is that I assumed to have a changelog topic
> for
> > > store-2 created automatically, which didnt happen.
> > >
> > > Since I want to access the changelog topic, it helps if the naming is
> > > consistent. So either we enforce the same naming pattern as kafka when
> > > calling .through() or alternatively the Kafka Streams API can provide a
> > > method to materialize tables which creates a topic name according to
> the
> > > naming pattern. E.g. .through() without the topic parameter.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Mikael
> > >
> > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax  >
> > > wrote:
> > >
> > >> I cannot completely follow what want to achieve.
> > >>
> > >> However, the JavaDoc for through() seems not to be correct to me.
> Using
> > >> through() will not create an extra internal changelog topic with the
> > >> described naming schema, because the topic specified in through() can
> be
> > >> used for this (there is no point in duplicating the data).
> > >>
> > >> If you have a KTable and apply a mapValues(), this will not write data
> > >> to any topic. The derived KTable is in-memory because you can easily
> > >> recreate it from its base KTable.
> > >>
> > >> What is the missing part you want to get?
> > >>
> > 

Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-23 Thread Hamidreza Afzali
Thanks Matthias.

Disabling the cache didn't solve the issue. Here's a sample code:

https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13

The topology doesn't produce any result but it works when commenting out 
.map(...) in line 21.

Thanks,
Hamid



RE: Kafka consumers are not equally distributed

2016-11-23 Thread Ghosh, Achintya (Contractor)
No, that is not the reason. Initially all the partitions were assigned the 
messages and those were processed very fast and sit idle even other partitions  
are having a lot of messages to be processed.
So I was under impression  that rebalance should be triggered and messages will 
be re-distributed equally again.

Thanks
Achintya 

-Original Message-
From: Sharninder [mailto:sharnin...@gmail.com] 
Sent: Wednesday, November 23, 2016 12:33 AM
To: users@kafka.apache.org
Cc: d...@kafka.apache.org
Subject: Re: Kafka consumers are not equally distributed

Could it be because of the partition key ?

On Wed, Nov 23, 2016 at 12:33 AM, Ghosh, Achintya (Contractor) < 
achintya_gh...@comcast.com> wrote:

> Hi there,
>
> We are doing the load test in Kafka with 25tps and first 9 hours it 
> went fine almost 80K/hr messages were processed after that we see a 
> lot of lags and we stopped the incoming load.
>
> Currently we see 15K/hr messages are processing. We have 40 consumer 
> instances with concurrency 4 and 2 topics and both is having 160 
> partitions so each consumer with each partition.
>
> What we found that some of the partitions are sitting idle and some of 
> are overloaded and its really slowing down the consumer message processing.
>
> Why rebalancing is not happening and existing messages are not 
> distributed equally among the instances? We tried to restart the app 
> still the same pace. Any idea what could be the reason?
>
> Thanks
> Achintya
>
>


--
--
Sharninder


KafkaStream: puncutuate() never called even when data is received by process()

2016-11-23 Thread shahab
Hello,

I am using low level processor and I set the context.schedule(1),
assuming that punctuate() method is invoked every 10 sec .
 I have set
configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getCanonicalName()) )

Although data is keep coming to the topology (as I have logged the incoming
tuples to process() ),  punctuate() is never executed.

What I am missing?

best,
Shahab


Re: Error "Unknown magic byte" occurred while deserializing Avro message

2016-11-23 Thread Dayong
As I remember this is to complain the fist byte of msg is not x00. I think 
console producer does not support json since it uses string schema. 

Thanks,
Dayong

> On Nov 23, 2016, at 4:28 AM, ZHU Hua B  wrote:
> 
> Hi,
> 
> 
> We tried to produce and consume a AVRO message (zookeeper, broker and schema 
> registry have been launched), error "Unknown magic byte" occurred while 
> deserializing Avro message, if I missed anything? Thanks!
> 
> 
> From producer:
> # bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test 
> --property 
> value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/root/confluent/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/root/confluent/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/root/confluent/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 
> {"f1": "value1"}
> 
> 
> From consumer:
> # bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 
> --from-beginning
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/root/confluent/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/root/confluent/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/root/confluent/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 
> [2016-11-23 16:15:11,108] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$:103)
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> Avro message for id -1
> Caused by: org.apache.kafka.common.errors.SerializationException: Unknown 
> magic byte!
> [2016-11-23 16:15:11,108] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$:103)
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> Avro message for id -1
> Caused by: org.apache.kafka.common.errors.SerializationException: Unknown 
> magic byte!
> 
> 
> Best Regards
> 
> Johnny
> 


Messages intermittently get lost

2016-11-23 Thread Zac Harvey
I am playing around with Kafka and have a simple setup:


* 1-node Kafka (Ubuntu) server

* 3-node ZK cluster (each on their own Ubuntu server)


I have a consumer written in Scala and am using the kafka-console-producer 
(v0.10) that ships with the distribution.


I'd say about 20% of the messages I send via the producer never get consumed by 
the Scala process (which is running continuously). No errors on either side 
(producer or consumer): the producer sends, and, nothing...


Any ideas as to what might be going on here, or how I could start 
troubleshooting?


Thanks!


Error "Unknown magic byte" occurred while deserializing Avro message

2016-11-23 Thread ZHU Hua B
Hi,


We tried to produce and consume a AVRO message (zookeeper, broker and schema 
registry have been launched), error "Unknown magic byte" occurred while 
deserializing Avro message, if I missed anything? Thanks!


>From producer:
# bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test 
--property 
value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/root/confluent/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/root/confluent/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/root/confluent/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

{"f1": "value1"}


>From consumer:
# bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 
--from-beginning
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/root/confluent/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/root/confluent/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/root/confluent/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

[2016-11-23 16:15:11,108] ERROR Unknown error when running consumer:  
(kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro 
message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic 
byte!
[2016-11-23 16:15:11,108] ERROR Unknown error when running consumer:  
(kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro 
message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic 
byte!


Best Regards

Johnny



Re: Delete "kafka-logs"

2016-11-23 Thread Sachin Mittal
Maybe because of these settings
log.retention.check.interval.ms=1
log.retention.ms=1

Try setting check interval at 1000 ms. I just feel these values are too low.

Try setting retention to be 1 hr and keep check interval to default which
is 300 seconds.

Also just check the permission of that dir or file. Maybe create a new logs
dir within your workspace where you have complete access.


On Wed, Nov 23, 2016 at 12:37 PM, Eranga Heshan 
wrote:

> The reason I set retention time to 10 seconds is because I run kafka for
> about 4 minutes each time. What I am running is a microbenchmark and I take
> throughput and latency values for 4mins of runtime.
>
> However when I run I get these printed on terminal.
>
> [2016-11-23 12:21:54,517] INFO Deleting index
> /tmp/kafka-logs-3/incrementStream1-1/35778556.index.deleted
> (kafka.log.OffsetIndex)
> [2016-11-23 12:21:54,516] INFO Deleting index
> /tmp/kafka-logs-3/incrementStream1-1/35550774.index.deleted
> (kafka.log.OffsetIndex)
> [2016-11-23 12:21:54,517] INFO Deleting index
> /tmp/kafka-logs-3/incrementStream1-1/36690192.index.deleted
> (kafka.log.OffsetIndex)
> [2016-11-23 12:21:54,517] INFO Deleting index
> /tmp/kafka-logs-3/incrementStream1-1/36918223.index.deleted
> (kafka.log.OffsetIndex)
> [2016-11-23 12:21:54,582] INFO Rolled new log segment for
> 'incrementStream1-1' in 0 ms. (kafka.log.Log)
> [2016-11-23 12:21:54,797] INFO Rolled new log segment for
> 'incrementStream3-0' in 1 ms. (kafka.log.Log)
> [2016-11-23 12:21:55,493] INFO Rolled new log segment for
> 'incrementStream1-1' in 1 ms. (kafka.log.Log)
> [2016-11-23 12:21:55,787] INFO Rolled new log segment for
> 'incrementStream3-0' in 0 ms. (kafka.log.Log)
> [2016-11-23 12:21:56,439] INFO Rolled new log segment for
> 'incrementStream1-1' in 0 ms. (kafka.log.Log)
>
> When I go to kafka-logs directory I can see the segments of 10MB are
> created with an index file. But after the retention time what happens is
> that the log files change their names from "09117624.log" to
> "09117624.log.deleted". And the index files change their names
> from "09117624.index" to "09117624.index.deleted".
> Nothing gets removed from the directory.
>
> Is there any particular reason for this to happen?
>
> Thanks,
> Regards,
>
>
> Eranga Heshan
> *Undergraduate*
> Computer Science & Engineering
> University of Moratuwa
> Mobile:  +94 71 138 2686 <%2B94%2071%20552%202087>
> Email: era...@wso2.com 
>     erangaheshan>
>
>
> On Wed, Nov 23, 2016 at 11:50 AM, Sachin Mittal 
> wrote:
>
> > try bin/kafka-server-start.sh ./config/server.properties
> >
> > Also check how many segments have got created in logs dir and size of
> each.
> > May be set the retention to 1 hour or 15 minutes. 10 sec seems very less.
> >
> >
> >
> > On Wed, Nov 23, 2016 at 11:21 AM, Eranga Heshan 
> > wrote:
> >
> > > As you suggested I did these changes to the server.properties file on
> all
> > > nodes.
> > >
> > > log.retention.check.interval.ms=1
> > > log.retention.ms=1
> > > log.segment.bytes=1049
> > >
> > > How I start each server is by,
> > >
> > > /home/ubuntu/eranga/software/kafka_2.11-0.10.0.1/bin/kafka-
> > server-start.sh
> > > -daemon
> > > /home/ubuntu/eranga/software/kafka_2.11-0.10.0.1/config/
> > server.properties
> > >
> > > However, I still get about 4GB log files. Did I miss anything?
> > >
> > > Thanks,
> > > Regards,
> > >
> > >
> > > Eranga Heshan
> > > *Undergraduate*
> > > Computer Science & Engineering
> > > University of Moratuwa
> > > Mobile:  +94 71 138 2686 <%2B94%2071%20552%202087>
> > > Email: era...@wso2.com 
> > >     > > erangaheshan>
> > >
> > >
> > > On Wed, Nov 23, 2016 at 10:14 AM, Sachin Mittal 
> > > wrote:
> > >
> > > > Check these in server.properties
> > > > # Log Retention Policy
> > > > #
> > > >
> > > > # The following configurations control the disposal of log segments.
> > The
> > > > policy can
> > > > # be set to delete segments after a period of time, or after a given
> > size
> > > > has accumulated.
> > > > # A segment will be deleted whenever *either* of these criteria are
> > met.
> > > > Deletion always happens
> > > > # from the end of the log.
> > > >
> > > > # The minimum age of a log file to be eligible for deletion
> > > > log.retention.hours=168
> > > >
> > > > # A size-based retention policy for logs. Segments are pruned from
> the
> > > log
> > > > as long as the remaining
> > > > # segments don't drop below log.retention.bytes.
> > > > #log.retention.bytes=1073741824
> > > >
>