Re: Problem with kafka-streams aggregate windowedBy
Hi Pavel, did you understood why do you have such strange behaviour? On Tue, Oct 30, 2018 at 12:22 PM Pavel Koroliov wrote: > I'm sorry guy's. Aggregation works fine, but i've found new problem with > *groupByKey()*. After restart application some aggregations starts from > beginning, although this key already has aggregated data. And some > aggregations continue to summarize data. This is very strange, I did not > expect such behavior. > > вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax : > > > Make sure to call `KafkaStreams#close()` to get the latest offsets > > committed. > > > > Beside this, you can check the consumer and Streams logs in DEBUG mode, > > to see what offset is picked up (or not). > > > > > > -Matthias > > > > On 10/29/18 11:43 AM, Patrik Kleindl wrote: > > > Hi > > > How long does your application run? More than the 60 seconds you set > for > > commit interval? > > > Have a look at > > > https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ > > > and check if your offsets are really comitted > > > Best regards > > > Patrik > > > > > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov : > > >> > > >> Hi > > >> No, my application id doesn't change > > >> > > >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl : > > >> > > >>> Hi > > >>> Does your applicationId change? > > >>> Best regards > > >>> Patrik > > >>> > > Am 29.10.2018 um 13:28 schrieb Pavel Koroliov >: > > > > Hi everyone! I use kafka-streams, and i have a problem when i use > > windowedBy. Everything works well until I restart the application. > > After > > restarting my aggregation starts from beginning. > > Code bellow: > > > > > > StreamsBuilder builder = new StreamsBuilder() > > > KStream stream = builder.stream(topic, > > >>> Consumed.with(Serdes.String(), Serdes.String())) > > > > > > KTable table = > > >>> > > > stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) > > > .aggregate( > > > { new AggregatorModel() }, > > > { key, value, aggregate -> > > > return aggregate.add(value) > > > } > > > ) > > > .toStream() > > > .map({ k, v -> > > > new KeyValue<>(k.window().end(), v) > > > }) > > > .to('output') > > > > > > def config = new Properties() > > > config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId) > > > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > 'localhost:9092') > > > config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > > >>> TimeUnit.SECONDS.toMillis(60)) > > > > > > KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), > > config) > > > kafkaStreams.start() > > > > > > > > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set > > to > > 'latest' and 'earliest' but it didn't help. > > Can you help me understand what I'm doing wrong? > > Thank you. > > >>> > > > > > > > > -- Vincenzo D'Amore
Re: Kafka streams messages duplicates with non-overlapping gap-less windows
Hi Matthias, thanks for your reply. Let me to explain better what I'm trying to say, in the meantime I've played with this problem and I think now I have a more clear view, though I haven't still a solution. I've an input topic A which is a stream of message where each message contains just an ID. Those IDs (messages) can be thousands of even millions but in my test (proof of concept) are all different. In order to process them, for each one I have to retrive few data that are stored in a nosql database, as you can understand querying one ID each time is not a good solution, I mean for performance reason, so I need to aggregate them and here comes the problem. So from the source topic A I have created a new topic B where for each message now has a key which is a number that change X milliseconds (say 500ms). Now I can have a group by key and an aggregate. I suppose that each list returned by aggregate() does not contains duplicates. The output of this aggregate process is saved in the topic C. Topic C contains arrays of IDs of different size and the key is the number created to group them. And here I have my big surprise, in the topic C there are a lot of ID that are present at the same time in different messages. Those messages have the same key but arrays of ID with different size, and each array partially contains ID present in other messages. I suppose this should be impossible. So, for example, if I have a stream with the following list of messages: key - value -- 0 - 1 0 - 2 0 - 3 0 - 4 0 - 5 1 - 6 1 - 7 1 - 8 1 - 9 1 - 10 I suppose the groupByKey() and aggregate() should return key - value 0 - [1,2,3,4,5] 1 - [6,7,8,9,10] But instead I found something like: key - value 0 - [1,2,3,4,5] 0 - [2,3,4,5] 1 - [6,7,8,9] 1 - [6,7,8,9,10] So the question is, did I do something wrong trying to aggregate them? how can avoid those duplicates? On Sat, Jan 26, 2019 at 9:01 PM Matthias J. Sax wrote: > I am not 100% sure, what you mean by > > >> I've a input topic where I'm 100% sure there are no duplicate keys or > messaged > > If this is the case (ie, each key is unique), it would imply that each > window contains exactly one record per key. Hence, why do you aggregate? > Each aggregate would consist of only one message making an aggregation > step unnecessary. > > Can you be a little bit more specific and provide a sample input > (key,value,timestamp), observed output, and expected output? > > I suspect (but I am not sure), that you might "struggle" with Kafka > Streams' continuous output model. Maybe this blog post sheds some light: > https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ > > > -Matthias > > On 1/25/19 9:31 AM, Vincenzo D'Amore wrote: > > Hi all, > > > > I write here because it's a couple of days I'm struggling trying to > > understand why I've so much duplicates during the messages processing > with > > kafka streams. > > > > I've a input topic where I'm 100% sure there are no duplicate keys or > > messages, > > > > During the process I've to aggregate the messages using > > groupByKey, windowedBy and aggregate: > > > > .map((v1, v2) -> { > > Long currentSecond = System.currentTimeMillis() / > 500; > > return new KeyValue<>(currentSecond.toString(), v2); > > }) > > .groupByKey(Serialized.with(Serdes.String(), new > > JsonSerde())) > > .windowedBy(TimeWindows.of(500)) > > .aggregate(() -> new ArrayList > JsonNode>>(), > > (aggKey, newValue, aggValue) -> { > > final StreamEntry > > kvSimpleEntry = new StreamEntry<>(aggKey, newValue); > > aggValue.add(kvSimpleEntry); > > return aggValue; > > }, Materialized.with(Serdes.String(), new > > ListKVJsonNodeSerde())) > > > > Even during this process I'm 100% sure there are no duplicates, but > > surprisingly after this I see that mapValues can be called with the same > > messages more than once. Even hundred of times. > > > >.mapValues(vv -> { > >// here the list vv contains the many > > > >}) > > > > Looking around I've found this project that seems to reproduce the > problem: > > https://github.com/westec/ks-aggregate-debug > > > > Given that I am using non-overlapping gap-less windows in kstream, the > > correct output should NOT contain duplicate messages between windows? > > Any ideas why the duplicates? > > > > > > -- Vincenzo D'Amore
Re: How to balance messages in kafka topics with newly added partitions?
Yes but I find this even easier to do with KSQL. CREATE STREAM OUTPUTTOPIC AS SELECT * FROM INPUTTOPIC; There are similar examples like this that also filter messages while copying, or change the message format while copying on the KSQL Recipe page here https://www.confluent.io/stream-processing-cookbook/ There is even an example for repartitioning topics using the PARTITIONS parameter. CREATE STREAM clickstream_new WITH (PARTITIONS=5) AS SELECT * from clickstream_raw; -hans > On Jan 27, 2019, at 9:24 AM, Ryanne Dolan wrote: > > You can use MirrorMaker to copy data between topics. > > Ryanne > >> On Sun, Jan 27, 2019, 7:12 AM jaaz jozz > >> Thanks, Sönke >> Is there any available kafka tool to move messages between topics? >> >> On Sun, Jan 27, 2019 at 2:40 PM Sönke Liebau >> wrote: >> >>> Hi Jazz, >>> >>> I'm afraid the only way of rebalancing old messages is indeed to >>> rewrite them to the topic - thus creating duplication. >>> Once a message has been written to a partition by Kafka this >>> assignment is final, there is no way of moving it to another >>> partition. >>> >>> Changing the partition count of topics at a later time can be a huge >>> headache, if you depend on partitioning. For this exact reason the >>> general recommendation is to overpartition your topics a little when >>> creating them, so that you can add consumers as the data volume >>> increases. >>> >>> In your case the best solution might be to delete and then recreate >>> the topic with more partitions. Now you can rewrite all your data and >>> it will result in a clean partitioning. >>> >>> Hope this helps a little, feel free to get back to us if you have more >>> questions! >>> >>> Best regards, >>> Sönke >>> On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz wrote: Hello, I have kafka cluster with certain topic that had too few partitions, >> so a large backlog of messages was collected. After i added additional partitions, only the newly messages balanced between all the new >>> partitions. What is the preferred way to balance the "old" backlog of messages >> inside the original partitions across all the new partitions? I thought of reading and writing again all the messages backlog to this topic and update the offsets accordingly, but it will make duplication >> of messages if a new consumer group will start consuming from the >> beginning >>> of this topic. How can i solve this? Thanks. >>> >>> >>> >>> -- >>> Sönke Liebau >>> Partner >>> Tel. +49 179 7940878 >>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >>> >>
[VOTE] 2.1.1 RC1
Hi all, This is the second candidate for release of Apache Kafka 2.1.1. This release includes many bug fixes for Apache Kafka 2.1. Compared to rc0, this release includes the following changes: * MINOR: Upgrade ducktape to 0.7.5 (#6197) * KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR * tests/kafkatest/__init__.py now contains __version__ = '2.1.1' rather than '2.1.1.dev0' * Maven artifacts should be properly staged this time * I have added my GPG key to https://kafka.apache.org/KEYS Check out the release notes here: http://home.apache.org/~cmccabe/kafka-2.1.1-rc1/RELEASE_NOTES.html The vote will go until Friday, February 1st. * Release artifacts to be voted upon (source and binary): http://home.apache.org/~cmccabe/kafka-2.1.1-rc1/ * Maven artifacts to be voted upon: https://repository.apache.org/content/groups/staging/ * Javadoc: http://home.apache.org/~cmccabe/kafka-2.1.1-rc1/javadoc/ * Tag to be voted upon (off 2.1 branch) is the 2.1.1 tag: https://github.com/apache/kafka/releases/tag/2.1.1-rc1 * Successful Jenkins builds for the 2.1 branch: Unit/integration tests: https://builds.apache.org/job/kafka-2.1-jdk8/118/ thanks, Colin
Re: Drawbacks for configuring many partitions for a topic
Hi Jazz, you probably already found this, but [1] is a good read and starting point around this topic! Best regards, Sönke [1] https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster On Sun, Jan 27, 2019 at 6:22 PM Ryanne Dolan wrote: > > Jazz, the number of partitions isn't necessarily related to message volume. > The biggest factors for max message volume would be the number of brokers > and their write speed. You should choose the number of partitions based on > the number of brokers and the number of consumers you expect to have. > > Ryanne > > On Sun, Jan 27, 2019, 8:55 AM jaaz jozz > > Hello, > > In order to be prepared for large volume of messages i want to configure my > > topic with large amount of partitions (>1000). > > What are the drawbacks of this? > > Should I except any pitfalls? > > > > Best regards, > > jazz > > -- Sönke Liebau Partner Tel. +49 179 7940878 OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
Re: How to balance messages in kafka topics with newly added partitions?
You can use MirrorMaker to copy data between topics. Ryanne On Sun, Jan 27, 2019, 7:12 AM jaaz jozz Thanks, Sönke > Is there any available kafka tool to move messages between topics? > > On Sun, Jan 27, 2019 at 2:40 PM Sönke Liebau > wrote: > > > Hi Jazz, > > > > I'm afraid the only way of rebalancing old messages is indeed to > > rewrite them to the topic - thus creating duplication. > > Once a message has been written to a partition by Kafka this > > assignment is final, there is no way of moving it to another > > partition. > > > > Changing the partition count of topics at a later time can be a huge > > headache, if you depend on partitioning. For this exact reason the > > general recommendation is to overpartition your topics a little when > > creating them, so that you can add consumers as the data volume > > increases. > > > > In your case the best solution might be to delete and then recreate > > the topic with more partitions. Now you can rewrite all your data and > > it will result in a clean partitioning. > > > > Hope this helps a little, feel free to get back to us if you have more > > questions! > > > > Best regards, > > Sönke > > > > On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz wrote: > > > > > > Hello, > > > > > > I have kafka cluster with certain topic that had too few partitions, > so a > > > large backlog of messages was collected. After i added additional > > > partitions, only the newly messages balanced between all the new > > partitions. > > > > > > What is the preferred way to balance the "old" backlog of messages > inside > > > the original partitions across all the new partitions? > > > > > > I thought of reading and writing again all the messages backlog to this > > > topic and update the offsets accordingly, but it will make duplication > of > > > messages if a new consumer group will start consuming from the > beginning > > of > > > this topic. > > > > > > How can i solve this? > > > > > > Thanks. > > > > > > > > -- > > Sönke Liebau > > Partner > > Tel. +49 179 7940878 > > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany > > >
Re: Drawbacks for configuring many partitions for a topic
Jazz, the number of partitions isn't necessarily related to message volume. The biggest factors for max message volume would be the number of brokers and their write speed. You should choose the number of partitions based on the number of brokers and the number of consumers you expect to have. Ryanne On Sun, Jan 27, 2019, 8:55 AM jaaz jozz Hello, > In order to be prepared for large volume of messages i want to configure my > topic with large amount of partitions (>1000). > What are the drawbacks of this? > Should I except any pitfalls? > > Best regards, > jazz >
Re: Broker continuously expand and shrinks to itself
Hi Harsha, Thanks for the reply. Issue is resolved as of now and the root cause was a runaway application spawning many instances of kafkacat and hammering kafka brokers. I am still wondering that what could be reason for shrink and expand is a client hammers a broker . --Ashish On Thursday, January 24, 2019, 8:53:10 AM PST, Harsha Chintalapani wrote: Hi Ashish, Whats your replica.lag.time.max.ms set to and do you see any network issues between brokers. -Harsha On Jan 22, 2019, 10:09 PM -0800, Ashish Karalkar , wrote: > Hi All, > We just upgraded from 0.10.x to 1.1 and enabled rack awareness on an existing > clusters which has about 20 nodes in 4 rack . After this we see that few > brokers goes on continuous expand and shrink ISR to itself cycle , it is > also causing high time for serving meta data requests. > What is the impact of enabling rack awareness on existing cluster assuming > replication factor is 3 and all existing replica may or may not be in > different rack when rack awareness was enabled after which a rolling bounce > was done. > Symptoms we are having are replica lag and slow metadata requests. Also in > brokers log we continuously see disconnection from the broker where it is > trying to expand. > Thanks for helping > --A
Drawbacks for configuring many partitions for a topic
Hello, In order to be prepared for large volume of messages i want to configure my topic with large amount of partitions (>1000). What are the drawbacks of this? Should I except any pitfalls? Best regards, jazz
Re: How to balance messages in kafka topics with newly added partitions?
Thanks, Sönke Is there any available kafka tool to move messages between topics? On Sun, Jan 27, 2019 at 2:40 PM Sönke Liebau wrote: > Hi Jazz, > > I'm afraid the only way of rebalancing old messages is indeed to > rewrite them to the topic - thus creating duplication. > Once a message has been written to a partition by Kafka this > assignment is final, there is no way of moving it to another > partition. > > Changing the partition count of topics at a later time can be a huge > headache, if you depend on partitioning. For this exact reason the > general recommendation is to overpartition your topics a little when > creating them, so that you can add consumers as the data volume > increases. > > In your case the best solution might be to delete and then recreate > the topic with more partitions. Now you can rewrite all your data and > it will result in a clean partitioning. > > Hope this helps a little, feel free to get back to us if you have more > questions! > > Best regards, > Sönke > > On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz wrote: > > > > Hello, > > > > I have kafka cluster with certain topic that had too few partitions, so a > > large backlog of messages was collected. After i added additional > > partitions, only the newly messages balanced between all the new > partitions. > > > > What is the preferred way to balance the "old" backlog of messages inside > > the original partitions across all the new partitions? > > > > I thought of reading and writing again all the messages backlog to this > > topic and update the offsets accordingly, but it will make duplication of > > messages if a new consumer group will start consuming from the beginning > of > > this topic. > > > > How can i solve this? > > > > Thanks. > > > > -- > Sönke Liebau > Partner > Tel. +49 179 7940878 > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >
Re: How to balance messages in kafka topics with newly added partitions?
Hi Jazz, I'm afraid the only way of rebalancing old messages is indeed to rewrite them to the topic - thus creating duplication. Once a message has been written to a partition by Kafka this assignment is final, there is no way of moving it to another partition. Changing the partition count of topics at a later time can be a huge headache, if you depend on partitioning. For this exact reason the general recommendation is to overpartition your topics a little when creating them, so that you can add consumers as the data volume increases. In your case the best solution might be to delete and then recreate the topic with more partitions. Now you can rewrite all your data and it will result in a clean partitioning. Hope this helps a little, feel free to get back to us if you have more questions! Best regards, Sönke On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz wrote: > > Hello, > > I have kafka cluster with certain topic that had too few partitions, so a > large backlog of messages was collected. After i added additional > partitions, only the newly messages balanced between all the new partitions. > > What is the preferred way to balance the "old" backlog of messages inside > the original partitions across all the new partitions? > > I thought of reading and writing again all the messages backlog to this > topic and update the offsets accordingly, but it will make duplication of > messages if a new consumer group will start consuming from the beginning of > this topic. > > How can i solve this? > > Thanks. -- Sönke Liebau Partner Tel. +49 179 7940878 OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
How to balance messages in kafka topics with newly added partitions?
Hello, I have kafka cluster with certain topic that had too few partitions, so a large backlog of messages was collected. After i added additional partitions, only the newly messages balanced between all the new partitions. What is the preferred way to balance the "old" backlog of messages inside the original partitions across all the new partitions? I thought of reading and writing again all the messages backlog to this topic and update the offsets accordingly, but it will make duplication of messages if a new consumer group will start consuming from the beginning of this topic. How can i solve this? Thanks.