Re: Problem with kafka-streams aggregate windowedBy

2019-01-27 Thread Vincenzo D'Amore
Hi Pavel, did you understood why do you have such strange behaviour?

On Tue, Oct 30, 2018 at 12:22 PM Pavel Koroliov 
wrote:

> I'm sorry guy's. Aggregation works fine, but i've found new problem with
> *groupByKey()*. After restart application some aggregations starts from
> beginning, although this key already has aggregated data. And some
> aggregations continue to summarize data. This is very strange, I did not
> expect such behavior.
>
> вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax :
>
> > Make sure to call `KafkaStreams#close()` to get the latest offsets
> > committed.
> >
> > Beside this, you can check the consumer and Streams logs in DEBUG mode,
> > to see what offset is picked up (or not).
> >
> >
> > -Matthias
> >
> > On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> > > Hi
> > > How long does your application run? More than the 60 seconds you set
> for
> > commit interval?
> > > Have a look at
> >
> https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
> > > and check if your offsets are really comitted
> > > Best regards
> > > Patrik
> > >
> > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov :
> > >>
> > >> Hi
> > >> No, my application id doesn't change
> > >>
> > >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl :
> > >>
> > >>> Hi
> > >>> Does your applicationId change?
> > >>> Best regards
> > >>> Patrik
> > >>>
> >  Am 29.10.2018 um 13:28 schrieb Pavel Koroliov  >:
> > 
> >  Hi everyone! I use kafka-streams, and i have a problem when i use
> >  windowedBy. Everything works well until I restart the application.
> > After
> >  restarting my aggregation starts from beginning.
> >  Code bellow:
> > >
> > >   StreamsBuilder builder = new StreamsBuilder()
> > >   KStream stream = builder.stream(topic,
> > >>> Consumed.with(Serdes.String(), Serdes.String()))
> > >
> > >   KTable table =
> > >>>
> >
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> > >   .aggregate(
> > >   { new AggregatorModel() },
> > >   { key, value, aggregate ->
> > >   return aggregate.add(value)
> > >   }
> > >   )
> > >   .toStream()
> > >   .map({ k, v ->
> > >   new KeyValue<>(k.window().end(), v)
> > >   })
> > >   .to('output')
> > >
> > >   def config = new Properties()
> > >   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> > >   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > 'localhost:9092')
> > >   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > >>> TimeUnit.SECONDS.toMillis(60))
> > >
> > >   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),
> > config)
> > >   kafkaStreams.start()
> > >
> > >
> >  I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set
> > to
> >  'latest' and 'earliest' but it didn't help.
> >  Can you help me understand what I'm doing wrong?
> >  Thank you.
> > >>>
> > >
> >
> >
>


-- 
Vincenzo D'Amore


Re: Kafka streams messages duplicates with non-overlapping gap-less windows

2019-01-27 Thread Vincenzo D'Amore
Hi Matthias, thanks for your reply. Let me to explain better what I'm
trying to say, in the meantime I've played with this problem and I think
now I have a more clear view, though I haven't still a solution.

I've an input topic A which is a stream of message where each message
contains just an ID. Those IDs (messages) can be thousands of even millions
but in my test (proof of concept) are all different.

In order to process them, for each one I have to retrive few data that are
stored in a nosql database, as you can understand querying one ID each time
is not a good solution, I mean for performance reason, so I need to
aggregate them and here comes the problem.

So from the source topic A I have created a new topic B where for each
message now has a key which is a number that change X milliseconds (say
500ms).
Now I can have a group by key and an aggregate. I suppose that each list
returned by aggregate() does not contains duplicates.
The output of this aggregate process is saved in the topic C.
Topic C contains arrays of IDs of different size and the key is the number
created to group them.

And here I have my big surprise, in the topic C there are a lot of ID that
are present at the same time in different messages.
Those messages have the same key but arrays of ID with different size, and
each array partially contains ID present in other messages.

I suppose this should be impossible.

So, for example, if I have a stream with the following list of messages:

key - value
--
0 - 1
0 - 2
0 - 3
0 - 4
0 - 5
1 - 6
1 - 7
1 - 8
1 - 9
1 - 10

I suppose the groupByKey() and aggregate() should return

key - value

0 - [1,2,3,4,5]
1 - [6,7,8,9,10]

But instead I found something like:

key - value

0 - [1,2,3,4,5]
0 - [2,3,4,5]
1 - [6,7,8,9]
1 - [6,7,8,9,10]

So the question is, did I do something wrong trying to aggregate them? how
can avoid those duplicates?


On Sat, Jan 26, 2019 at 9:01 PM Matthias J. Sax 
wrote:

> I am not 100% sure, what you mean by
>
> >> I've a input topic where I'm 100% sure there are no duplicate keys or
> messaged
>
> If this is the case (ie, each key is unique), it would imply that each
> window contains exactly one record per key. Hence, why do you aggregate?
> Each aggregate would consist of only one message making an aggregation
> step unnecessary.
>
> Can you be a little bit more specific and provide a sample input
> (key,value,timestamp), observed output, and expected output?
>
> I suspect (but I am not sure), that you might "struggle" with Kafka
> Streams' continuous output model. Maybe this blog post sheds some light:
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
>
>
> -Matthias
>
> On 1/25/19 9:31 AM, Vincenzo D'Amore wrote:
> > Hi all,
> >
> > I write here because it's a couple of days I'm struggling trying to
> > understand why I've so much duplicates during the messages processing
> with
> > kafka streams.
> >
> > I've a input topic where I'm 100% sure there are no duplicate keys or
> > messages,
> >
> > During the process I've to aggregate the messages using
> > groupByKey, windowedBy and aggregate:
> >
> > .map((v1, v2) -> {
> > Long currentSecond = System.currentTimeMillis() /
> 500;
> > return new KeyValue<>(currentSecond.toString(), v2);
> > })
> > .groupByKey(Serialized.with(Serdes.String(), new
> > JsonSerde()))
> > .windowedBy(TimeWindows.of(500))
> > .aggregate(() -> new ArrayList > JsonNode>>(),
> > (aggKey, newValue, aggValue) -> {
> > final StreamEntry
> > kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
> > aggValue.add(kvSimpleEntry);
> > return aggValue;
> > }, Materialized.with(Serdes.String(), new
> > ListKVJsonNodeSerde()))
> >
> > Even during this process I'm 100% sure there are no duplicates, but
> > surprisingly after this I see that mapValues can be called with the same
> > messages more  than once. Even hundred of times.
> >
> >.mapValues(vv -> {
> >// here the list vv contains the many
> >
> >})
> >
> > Looking around I've found this project that seems to reproduce the
> problem:
> > https://github.com/westec/ks-aggregate-debug
> >
> > Given that I am using non-overlapping gap-less windows in kstream, the
> > correct output should NOT contain duplicate messages between windows?
> > Any ideas why the duplicates?
> >
> >
>
>

-- 
Vincenzo D'Amore


Re: How to balance messages in kafka topics with newly added partitions?

2019-01-27 Thread Hans Jespersen
Yes but I find this even easier to do with KSQL. 

CREATE STREAM OUTPUTTOPIC AS SELECT * FROM INPUTTOPIC;

There are similar examples like this that also filter messages while copying, 
or change the message format while copying on the KSQL Recipe page here
https://www.confluent.io/stream-processing-cookbook/

There is even an example for repartitioning topics using the PARTITIONS 
parameter.
CREATE STREAM clickstream_new WITH (PARTITIONS=5) AS SELECT * from 
clickstream_raw;
-hans

> On Jan 27, 2019, at 9:24 AM, Ryanne Dolan  wrote:
> 
> You can use MirrorMaker to copy data between topics.
> 
> Ryanne
> 
>> On Sun, Jan 27, 2019, 7:12 AM jaaz jozz > 
>> Thanks, Sönke
>> Is there any available kafka tool to move messages between topics?
>> 
>> On Sun, Jan 27, 2019 at 2:40 PM Sönke Liebau
>>  wrote:
>> 
>>> Hi Jazz,
>>> 
>>> I'm afraid the only way of rebalancing old messages is indeed to
>>> rewrite them to the topic - thus creating duplication.
>>> Once a message has been written to a partition by Kafka this
>>> assignment is final, there is no way of moving it to another
>>> partition.
>>> 
>>> Changing the partition count of topics at a later time can be a huge
>>> headache, if you depend on partitioning. For this exact reason the
>>> general recommendation is to overpartition your topics a little when
>>> creating them, so that you can add consumers as the data volume
>>> increases.
>>> 
>>> In your case the best solution might be to delete and then recreate
>>> the topic with more partitions. Now you can rewrite all your data and
>>> it will result in a clean partitioning.
>>> 
>>> Hope this helps a little, feel free to get back to us if you have more
>>> questions!
>>> 
>>> Best regards,
>>> Sönke
>>> 
 On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz  wrote:
 
 Hello,
 
 I have kafka cluster with certain topic that had too few partitions,
>> so a
 large backlog of messages was collected. After i added additional
 partitions, only the newly messages balanced between all the new
>>> partitions.
 
 What is the preferred way to balance the "old" backlog of messages
>> inside
 the original partitions across all the new partitions?
 
 I thought of reading and writing again all the messages backlog to this
 topic and update the offsets accordingly, but it will make duplication
>> of
 messages if a new consumer group will start consuming from the
>> beginning
>>> of
 this topic.
 
 How can i solve this?
 
 Thanks.
>>> 
>>> 
>>> 
>>> --
>>> Sönke Liebau
>>> Partner
>>> Tel. +49 179 7940878
>>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>>> 
>> 


[VOTE] 2.1.1 RC1

2019-01-27 Thread Colin McCabe
Hi all,

This is the second candidate for release of Apache Kafka 2.1.1.  This release 
includes many bug fixes for Apache Kafka 2.1.

Compared to rc0, this release includes the following changes:
* MINOR: Upgrade ducktape to 0.7.5 (#6197)
* KAFKA-7837: Ensure offline partitions are picked up as soon as possible when 
shrinking ISR
* tests/kafkatest/__init__.py now contains __version__ = '2.1.1' rather than 
'2.1.1.dev0'
* Maven artifacts should be properly staged this time
* I have added my GPG key to https://kafka.apache.org/KEYS

Check out the release notes here:
http://home.apache.org/~cmccabe/kafka-2.1.1-rc1/RELEASE_NOTES.html

The vote will go until Friday, February 1st.

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~cmccabe/kafka-2.1.1-rc1/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~cmccabe/kafka-2.1.1-rc1/javadoc/

* Tag to be voted upon (off 2.1 branch) is the 2.1.1 tag:
https://github.com/apache/kafka/releases/tag/2.1.1-rc1

* Successful Jenkins builds for the 2.1 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-2.1-jdk8/118/

thanks,
Colin


Re: Drawbacks for configuring many partitions for a topic

2019-01-27 Thread Sönke Liebau
Hi Jazz,

you probably already found this, but [1] is a good read and starting
point around this topic!

Best regards,
Sönke

[1] 
https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster

On Sun, Jan 27, 2019 at 6:22 PM Ryanne Dolan  wrote:
>
> Jazz, the number of partitions isn't necessarily related to message volume.
> The biggest factors for max message volume would be the number of brokers
> and their write speed. You should choose the number of partitions based on
> the number of brokers and the number of consumers you expect to have.
>
> Ryanne
>
> On Sun, Jan 27, 2019, 8:55 AM jaaz jozz 
> > Hello,
> > In order to be prepared for large volume of messages i want to configure my
> > topic with large amount of partitions (>1000).
> > What are the drawbacks of this?
> > Should I except any pitfalls?
> >
> > Best regards,
> > jazz
> >



-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


Re: How to balance messages in kafka topics with newly added partitions?

2019-01-27 Thread Ryanne Dolan
You can use MirrorMaker to copy data between topics.

Ryanne

On Sun, Jan 27, 2019, 7:12 AM jaaz jozz  Thanks, Sönke
> Is there any available kafka tool to move messages between topics?
>
> On Sun, Jan 27, 2019 at 2:40 PM Sönke Liebau
>  wrote:
>
> > Hi Jazz,
> >
> > I'm afraid the only way of rebalancing old messages is indeed to
> > rewrite them to the topic - thus creating duplication.
> > Once a message has been written to a partition by Kafka this
> > assignment is final, there is no way of moving it to another
> > partition.
> >
> > Changing the partition count of topics at a later time can be a huge
> > headache, if you depend on partitioning. For this exact reason the
> > general recommendation is to overpartition your topics a little when
> > creating them, so that you can add consumers as the data volume
> > increases.
> >
> > In your case the best solution might be to delete and then recreate
> > the topic with more partitions. Now you can rewrite all your data and
> > it will result in a clean partitioning.
> >
> > Hope this helps a little, feel free to get back to us if you have more
> > questions!
> >
> > Best regards,
> > Sönke
> >
> > On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz  wrote:
> > >
> > > Hello,
> > >
> > > I have kafka cluster with certain topic that had too few partitions,
> so a
> > > large backlog of messages was collected. After i added additional
> > > partitions, only the newly messages balanced between all the new
> > partitions.
> > >
> > > What is the preferred way to balance the "old" backlog of messages
> inside
> > > the original partitions across all the new partitions?
> > >
> > > I thought of reading and writing again all the messages backlog to this
> > > topic and update the offsets accordingly, but it will make duplication
> of
> > > messages if a new consumer group will start consuming from the
> beginning
> > of
> > > this topic.
> > >
> > > How can i solve this?
> > >
> > > Thanks.
> >
> >
> >
> > --
> > Sönke Liebau
> > Partner
> > Tel. +49 179 7940878
> > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
> >
>


Re: Drawbacks for configuring many partitions for a topic

2019-01-27 Thread Ryanne Dolan
Jazz, the number of partitions isn't necessarily related to message volume.
The biggest factors for max message volume would be the number of brokers
and their write speed. You should choose the number of partitions based on
the number of brokers and the number of consumers you expect to have.

Ryanne

On Sun, Jan 27, 2019, 8:55 AM jaaz jozz  Hello,
> In order to be prepared for large volume of messages i want to configure my
> topic with large amount of partitions (>1000).
> What are the drawbacks of this?
> Should I except any pitfalls?
>
> Best regards,
> jazz
>


Re: Broker continuously expand and shrinks to itself

2019-01-27 Thread Ashish Karalkar
 Hi Harsha,
Thanks for the reply.
Issue is resolved as of now and the root cause was a runaway application 
spawning many instances of kafkacat and hammering kafka brokers. I am still 
wondering that what could be reason for shrink and expand is a client hammers a 
broker  .
--Ashish 
On Thursday, January 24, 2019, 8:53:10 AM PST, Harsha Chintalapani 
 wrote:  
 
 Hi Ashish,
           Whats your replica.lag.time.max.ms set to and do you see any network 
issues between brokers.
-Harsha



On Jan 22, 2019, 10:09 PM -0800, Ashish Karalkar 
, wrote:
> Hi All,
> We just upgraded from 0.10.x to 1.1 and enabled rack awareness on an existing 
> clusters which has about 20 nodes in 4 rack . After this we see that few 
> brokers goes on continuous expand and shrink ISR to itself  cycle , it is 
> also causing high time for serving meta data requests.
> What is the impact of enabling rack awareness on existing cluster assuming 
> replication factor is 3 and all existing replica may or may not be in 
> different rack when rack awareness was enabled after which a rolling bounce 
> was done.
> Symptoms we are having are replica lag and slow metadata requests. Also in 
> brokers log we continuously see disconnection from the broker where it is 
> trying to expand.
> Thanks for helping
> --A  

Drawbacks for configuring many partitions for a topic

2019-01-27 Thread jaaz jozz
Hello,
In order to be prepared for large volume of messages i want to configure my
topic with large amount of partitions (>1000).
What are the drawbacks of this?
Should I except any pitfalls?

Best regards,
jazz


Re: How to balance messages in kafka topics with newly added partitions?

2019-01-27 Thread jaaz jozz
Thanks, Sönke
Is there any available kafka tool to move messages between topics?

On Sun, Jan 27, 2019 at 2:40 PM Sönke Liebau
 wrote:

> Hi Jazz,
>
> I'm afraid the only way of rebalancing old messages is indeed to
> rewrite them to the topic - thus creating duplication.
> Once a message has been written to a partition by Kafka this
> assignment is final, there is no way of moving it to another
> partition.
>
> Changing the partition count of topics at a later time can be a huge
> headache, if you depend on partitioning. For this exact reason the
> general recommendation is to overpartition your topics a little when
> creating them, so that you can add consumers as the data volume
> increases.
>
> In your case the best solution might be to delete and then recreate
> the topic with more partitions. Now you can rewrite all your data and
> it will result in a clean partitioning.
>
> Hope this helps a little, feel free to get back to us if you have more
> questions!
>
> Best regards,
> Sönke
>
> On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz  wrote:
> >
> > Hello,
> >
> > I have kafka cluster with certain topic that had too few partitions, so a
> > large backlog of messages was collected. After i added additional
> > partitions, only the newly messages balanced between all the new
> partitions.
> >
> > What is the preferred way to balance the "old" backlog of messages inside
> > the original partitions across all the new partitions?
> >
> > I thought of reading and writing again all the messages backlog to this
> > topic and update the offsets accordingly, but it will make duplication of
> > messages if a new consumer group will start consuming from the beginning
> of
> > this topic.
> >
> > How can i solve this?
> >
> > Thanks.
>
>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>


Re: How to balance messages in kafka topics with newly added partitions?

2019-01-27 Thread Sönke Liebau
Hi Jazz,

I'm afraid the only way of rebalancing old messages is indeed to
rewrite them to the topic - thus creating duplication.
Once a message has been written to a partition by Kafka this
assignment is final, there is no way of moving it to another
partition.

Changing the partition count of topics at a later time can be a huge
headache, if you depend on partitioning. For this exact reason the
general recommendation is to overpartition your topics a little when
creating them, so that you can add consumers as the data volume
increases.

In your case the best solution might be to delete and then recreate
the topic with more partitions. Now you can rewrite all your data and
it will result in a clean partitioning.

Hope this helps a little, feel free to get back to us if you have more
questions!

Best regards,
Sönke

On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz  wrote:
>
> Hello,
>
> I have kafka cluster with certain topic that had too few partitions, so a
> large backlog of messages was collected. After i added additional
> partitions, only the newly messages balanced between all the new partitions.
>
> What is the preferred way to balance the "old" backlog of messages inside
> the original partitions across all the new partitions?
>
> I thought of reading and writing again all the messages backlog to this
> topic and update the offsets accordingly, but it will make duplication of
> messages if a new consumer group will start consuming from the beginning of
> this topic.
>
> How can i solve this?
>
> Thanks.



-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


How to balance messages in kafka topics with newly added partitions?

2019-01-27 Thread jaaz jozz
Hello,

I have kafka cluster with certain topic that had too few partitions, so a
large backlog of messages was collected. After i added additional
partitions, only the newly messages balanced between all the new partitions.

What is the preferred way to balance the "old" backlog of messages inside
the original partitions across all the new partitions?

I thought of reading and writing again all the messages backlog to this
topic and update the offsets accordingly, but it will make duplication of
messages if a new consumer group will start consuming from the beginning of
this topic.

How can i solve this?

Thanks.