Re: How to set concrete names for state stores and internal topics backed by these

2019-12-06 Thread Patrik Kleindl
Hi Sachin We are using a small helper method to keep this readable: private Materialized materializedWith(String name, Serde keySerde, Serde valueSerde) { Materialized materialized = Materialized.as(name); return materialized.withKeySerde(keySerde).withValueSerde(valueSerde); } So the

Re: Case of joining multiple streams/tables

2019-12-06 Thread Patrik Kleindl
Hi https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup might be worth a look. best regards Patrik On Fri, 6 Dec 2019 at 06:44, Sachin Mittal wrote: > I was thinking more of a builder api at DSL level. > Something like this: > StreamsBuilder.joineBuilder() >

Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Patrik Kleindl
Congratulations John! Well deserved and thanks for all your help Best regards Patrik > Am 13.11.2019 um 06:10 schrieb Kamal Chandraprakash > : > > Congrats John! > >> On Wed, Nov 13, 2019 at 7:57 AM Dong Lin wrote: >> >> Congratulations John! >> >>> On Tue, Nov 12, 2019 at 1:56 PM

Re: How to start a stream from only new records?

2019-08-13 Thread Patrik Kleindl
Hi Our requirement is related, we want our streams application to only process messages from the last x weeks. On new deployments this requires starting the application first, stopping the application and then resetting the offsets. I have created https://issues.apache.org/jira/browse/KAFKA-8766

Re: Kafka Streams - unbounded memory growth - stateful processing (rocksdb)

2019-07-16 Thread Patrik Kleindl
Hello Ashok Adding to what Sophie wrote, if you use a custom RocksDBConfigSetter then override the BlockBasedTableConfig like following and call options.setTableFormatConfig(tableConfig) at the end. BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

Re: Streams reprocessing whole topic when deployed but not locally

2019-07-10 Thread Patrik Kleindl
Hi Regarding the I/O, RocksDB has something called write amplification which writes the data to multiple levels internally to enable better optimization at the cost of storage and I/O. This is also the reason the stores can get larger than the topics themselves. This can be modified by RocksDB

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-03 Thread Patrik Kleindl
t; (#global_state_stores + >>>>>> sum(#partitions_of_topic_per_local_state_store)) . The number of >>>>>> stream threads isn't relevant here. >>>>>> >>>>>> You can also figure it out empirically: the first level of &

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-27 Thread Patrik Kleindl
store you might not have closed. br, Patrik On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com < emailtokir...@gmail.com> wrote: > > > On 2019/06/27 09:02:39, Patrik Kleindl wrote: > > Hello Kiran > > > > First, the value for maxOpenFiles is per RocksDB in

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-27 Thread Patrik Kleindl
that helps best regards Patrik On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com < emailtokir...@gmail.com> wrote: > > > On 2019/06/26 21:58:02, Patrik Kleindl wrote: > > Hi Kiran > > You can use the RocksDBConfigSetter and pass > > > > options.setMax

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-26 Thread Patrik Kleindl
Hi Kiran You can use the RocksDBConfigSetter and pass options.setMaxOpenFiles(100); to all RocksDBs for the Streams application which limits how many are kept open at the same time. best regards Patrik On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com < emailtokir...@gmail.com> wrote: >

Re: Offsets of deleted consumer groups do not get deleted correctly

2019-04-01 Thread Patrik Kleindl
Hi Claudia Just a sidenote, there is a combined policy for "compact, delete" which deletes messages older than retention.ms and compacts newer ones if I remember correctly. It's still not really in the docs as it seems https://kafka.apache.org/documentation/#topicconfigs best regards Patrik On

Re: KafkaStreams backoff for non-existing topic

2019-03-25 Thread Patrik Kleindl
Hi Guozhang Just a small question, why can't this be checked when trying to instantiate KafkaStreams? The Topology should know all topics and the existence of the topics could be verified with the AdminClient. This would allow to fail fast similar to when the state directory is not available. Or

Re: No checkpoint found

2019-03-20 Thread Patrik Kleindl
Hi Claudia Probably https://issues.apache.org/jira/browse/KAFKA-5998, welcome to the club ;-) best regards Patrik On Wed, 20 Mar 2019 at 10:25, Claudia Wegmann wrote: > Hi kafka users, > > since upgrading to kafka 2.1.1 version I get the following log message at > every startup of streaming

Re: Operationalizing Zookeeper and common gotchas

2019-03-18 Thread Patrik Kleindl
Hi Eno Thanks too, this is indeed helpful Best regards Patrik > Am 18.03.2019 um 18:16 schrieb Eno Thereska : > > Hi folks, > > The team here has come up with a couple of clarifying tips for > operationalizing Zookeeper for Kafka that we found missing from the > official documentation, and

Re: [VOTE] 2.2.0 RC0

2019-02-25 Thread Patrik Kleindl
Hi Matthias Minor issue, if locale is not english (german in my case) then org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB> at org.junit.Assert.assertEquals(Assert.java:115) at

Re: Minimizing global store restoration time

2019-02-21 Thread Patrik Kleindl
possibly exceed it (default is 50Mb). > > > Guozhang > > > > On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl wrote: > > > Hi Taylor > > You are right, the parallel processing is not mentioned in this issue, if > > I remember correctly it was in the thread tha

Re: Accessing Kafka stream's KTable underlying RocksDB memory usage

2019-02-17 Thread Patrik Kleindl
Hi How many partitions do your topics have? As far as I understand there is a RocksDB for every partition of every KTable and this can add up quickly. Depending on how many instances you are using one of them might have to handle the complete load temporarily which will use more memory. Also,

Re: Minimizing global store restoration time

2019-02-08 Thread Patrik Kleindl
> amounts of data in global stores and whether there are any inherent > limitations to the size of global stores. > > Our topic is already using compaction. > > Taylor > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl wrote: >> >> Hi Taylo

Re: Warning when adding GlobalKTable to toplogy

2019-01-19 Thread Patrik Kleindl
Hi That is because the global tables are handled separately by the GlobalStreamThread as far as I understand. You also don‘t see their offsets like for regular consumers. Best regards Patrik > Am 19.01.2019 um 18:19 schrieb Dmitry Minkovsky : > > When I add a GlobalKTable for topic >

User Activity Tracking

2019-01-10 Thread Patrik Kleindl
Hi everyone, we are planning to add some user activity tracking to an application and I wanted to ask around for your general experiences and best practices. Do you use one topic per application or more granular? Do you write directly from the application to Kafka for tracking purposes? How to

Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-11-20 Thread Patrik Kleindl
ould not be materialized if users do not specify a >> materialized store name, only the value-transformed KTable will be >> materialized: >> >> https://github.com/apache/kafka/pull/5779 >> >> >> Would that work for you? >> >> Guozhang >&

Re: Stream Metrics - Memory Analysis

2018-11-20 Thread Patrik Kleindl
provide some more elaborations on what you did the JVM analysis, > so that I can try to re-produce the observations. > > > Guozhang > > On Thu, Oct 25, 2018 at 2:50 AM Patrik Kleindl wrote: > > > Hello > > > > During the analysis of JVM memory two possible

Re: Offsets/Lags for global state stores not shown

2018-11-18 Thread Patrik Kleindl
obalKTable will be fully populated > from the topic. For the KTable case, you can query from the very > beginning on, while data is put into the table. > > Also, for this approach, if you add other processing, this processing > would not be parallelized but duplicated. > >

Re: Using GlobalKTable/KeyValueStore for topic cache

2018-11-13 Thread Patrik Kleindl
Hi Chris We are using them like you described. Performance is very good compared to the database used before. Beware that until https://issues.apache.org/jira/browse/KAFKA-7380 is done the startup will be blocked until all global stores are restored (sequentially). This can take a little for

Re: Offsets/Lags for global state stores not shown

2018-11-07 Thread Patrik Kleindl
nstead of subscription) and this consumer does not commit > any offset to Kafka. > > Note that global stores are bootstrapped before processing begins > though, and are expected to be low throughput topic anyway. > > > -Matthias > > On 11/6/18 2:03 AM, Patrik Kleindl wr

Offsets/Lags for global state stores not shown

2018-11-06 Thread Patrik Kleindl
Hello Am I doing something wrong or is it by design that global state stores and their consumers do not show up under the consumer-groups? With the consumer group command (and in control center as well) I don't get any output for the group: ./kafka-consumer-groups --bootstrap-server broker:9092

Re: Deduplicating a topic in the face of producer crashes over a time window?

2018-11-02 Thread Patrik Kleindl
Hi Andrew Did you take a look at https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java ? We are using this for a case like you described. Growth should be limited with this approach. Best

Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-29 Thread Patrik Kleindl
uot; version, > it's > > > an internal changelog topic, and for the "topic-to-table" version, the > > > store can use the intermediate topic as its changelog. > > > > > > This doesn't address your ergonomic concern, but it seemed worth > poi

Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-26 Thread Patrik Kleindl
rigger reduce() to > delete, you will need to use a surrogate value for this, ie, do a > mapValues() before the groupByKey() call, an replace `null` values with > the surrogate-delete-marker that you can evaluate in `Reducer#apply()` > to return `null` for this case. > > Hope t

Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-25 Thread Patrik Kleindl
Hello Recently we noticed a lot of warning messages in the logs which pointed to this method (we are running 2.0): KStreamReduce public void process(final K key, final V value) { // If the key or value is null we don't need to proceed if (key == null || value == null) {

Stream Metrics - Memory Analysis

2018-10-25 Thread Patrik Kleindl
Hello During the analysis of JVM memory two possible issues were shown which I would like to bring to your attention: 1) Duplicate strings Top findings: string_content="stream-processor-node-metrics" count="534,277" string_content="processor-node-id" count="148,437"

RocksDB not closed on error during CachingKeyValueStore.flush?

2018-10-23 Thread Patrik Kleindl
Hello Can someone please verify if my assumption is correct? In CachingKeyValueStore, if an exception happens during flush() the store will not be closed properly. @Override public void flush() { lock.writeLock().lock(); try { cache.flush(cacheName); underlying.flush();

Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-04 Thread Patrik Kleindl
can do, because Streams hard codes to set > the policy to "none". Thus, a manual restart (that is gladly working as > you confirmed) it currently the way to go. > > Thanks for reporting this issue. > > > -Matthias > >> On 10/4/18 3:23 A

Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-04 Thread Patrik Kleindl
> globalConsumer is lagging behind? Can you verify this? If yes, it seems > to make sense to stop processing to inform the user about this issue. > Would you rather prefer the application to just move on implying silent > data loss?? > > > -Matthias > > > On 10/3/18 12:

Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-03 Thread Patrik Kleindl
d to recover from this exception? > > > -Matthias > > On 10/2/18 4:54 AM, Patrik Kleindl wrote: > > Hi > > > > We had several incidents where a streams application crashed while > > maintaining a global state

Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-02 Thread Patrik Kleindl
Hi We had several incidents where a streams application crashed while maintaining a global state store. Updating global state failed. You can restart KafkaStreams to recover from this error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured

Managing/Versioning Topic Configurations for CI/CD

2018-09-27 Thread Patrik Kleindl
Hello everyone, we are currently trying to improve the management of our topic configurations. At the moment we are managing the configurations on the producer side and checking/creating/changing topics on each application (instance) startup. This has worked fine for many cases, but does not

Re: GlobalKTable/KTable initialization differences

2018-09-06 Thread Patrik Kleindl
IN ADDITION, the global stores being restored as well. >> >> If you like, please feel free to create a JIRA requesting this improvement >> so someone can work on it someday. >> >> Guozhang >> >> >> >> >>> On Sat, Aug 25, 2018 at

Re: resetting consumer group offset to earliest and to-latest not working

2018-09-01 Thread Patrik Kleindl
Hello Did you add --execute to the command? Which command did you use? Best regards Patrik > Am 01.09.2018 um 14:54 schrieb Joseph M'BIMBI-BENE : > > Hello everyone, > > Hopefully this is the appropriate mailing list for my message. > When i am trying to reset the offset of some consumer

GlobalKTable/KTable initialization differences

2018-08-25 Thread Patrik Kleindl
Hello We are currently using GlobalKTables for interactive queries as well as for lookups inside stream applications but have come across some limitations/problems. The main problem was that our deployments including application start took longer with every new global state store we added which

Improve error message when trying to produce message without key for compacted topic

2018-08-21 Thread Patrik Kleindl
Hello Yesterday we had the following exception: Exception thrown when sending a message with key='null' and payload='...' to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. The

Re: Usage of cleanup.policy=compact,delete

2018-08-13 Thread Patrik Kleindl
edge of the code. I > > seem to recall we tried this for a repartition topic and it didn't do > quite > > what we expected. > > > > On Fri, Aug 10, 2018 at 3:02 AM Patrik Kleindl > wrote: > > > >> Hello > >> > >> In a discussion yesterday th

Usage of cleanup.policy=compact,delete

2018-08-10 Thread Patrik Kleindl
Hello In a discussion yesterday the question came up if an internal changelog topic can be enabled for compaction and deletion. https://stackoverflow.com/questions/50622369/kafka-streams-is-it-possible-to-have-compact-delete-policy-on-state-stores and