Re: How to set concrete names for state stores and internal topics backed by these

2019-12-06 Thread Patrik Kleindl
Hi Sachin

We are using a small helper method to keep this readable:

private  Materialized
materializedWith(String name, Serde keySerde, Serde valueSerde)
{
Materialized materialized = Materialized.as(name);
return materialized.withKeySerde(keySerde).withValueSerde(valueSerde);
}

So the Materialized.as just becomes a

materializedWith("storename", keySerde, valueSerde)

Hope that helps

Patrik


On Fri, 6 Dec 2019 at 18:32, John Roesler  wrote:

> Hi Sachin,
>
> The way that Java infers generic arguments makes that case particularly
> obnoxious.
>
> By the way, the problem you're facing is specifically addressed by these
> relatively new features:
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping
>
> Since this behavior has been under development recently, I thought you
> might benefit from the context.
>
> To answer your question, what you have to do is explicitly mention the
> type arguments to "Materialized.as(name)" when you're using the
> withKeySerde, etc.
>
> It will look something like this:
>
> Materialized
>   .>as("store-name")
>   .withKeySerde(new Serde...)
>   .withValueSerde(new Serde...));
>
> I can explain exactly why this is necessary if you want, but the short
> answer is that the Java type system only makes a rudimentary effort to
> infer types.
>
> FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can
> find a way to fix it, if we ever change the Materialized builder interface.
>
> Hope this helps,
> -John
>
> On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> > Hi,
> > In my application I have names of internal topics like this:
> >
> > ss-session-application-KSTREAM-JOINOTHER-59-store-changelog-0
> > ss-session-application-KSTREAM-JOINTHIS-49-store-changelog-0
> > ss-session-application-KSTREAM-OUTEROTHER-50-store-changelog-0
> > ss-session-application-KTABLE-MERGE-STATE-STORE-61-changelog-0
> >
> > Is it possible to set concrete names for these instead of say **
> > KSTREAM-JOINOTHER-59-store**
> >
> > This way I can identify at what code in my DSL is responsible for data
> > inside them.
> >
> > So far I have set names for:
> > Grouped.with
> > Materialized.as
> > Joined.with
> >
> > This has helped me get concrete names at many places however still at
> some
> > places I see arbitrary names.
> >
> > Also note that somehow this code works
> > Materialized.with(new JSONSerde(), new TupleJSONSerde>())
> >
> > But not:
> > Materialized.as("d-l-i-store").withKeySerde(new
> > JSONSerde()).withValueSerde(new TupleJSONSerde>())
> >
> > The error I get is:
> > Description Resource Path Location Type
> > The method withKeySerde(Serde) in the type
> > Materialized is not applicable for the
> arguments
> > (JSONSerde)
> >
> > I have my class
> >
> > class JSONSerde implements Serializer,
> > Deserializer, Serde {
> > ..
> > }
> >
> > This is pretty much same as from kafka streams typed example.
> >
> > Thanks
> > Sachin
> >
>


Re: Case of joining multiple streams/tables

2019-12-06 Thread Patrik Kleindl
Hi
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
might
be worth a look.
best regards
Patrik

On Fri, 6 Dec 2019 at 06:44, Sachin Mittal  wrote:

> I was thinking more of a builder api at DSL level.
> Something like this:
> StreamsBuilder.joineBuilder()
> .join(kstream1)
> .leftJoin(kstream2)
> .leftJoin(kstream3)
> 
> .joiner((k,v1,v2,v3...) -> ...)
> .window()
> .build();
>
>
> So when we require to join multiple streams, it can be done using one
> joiner.
> Internally this can be implemented at the processor level.
>
> So not so much of just adding another API call, but something that can make
> code cleaner and more efficient, by using single joiner.
>
> Thanks
> Sachin
>
>
>
> On Thu, Dec 5, 2019 at 2:22 PM Bruno Cadonna  wrote:
>
> > Hi Sachin,
> >
> > I do not completely understand what you mean with one single
> > operation. Do you mean one call of a method in the DSL or the join is
> > processed on one processor node?
> >
> > If you mean the latter, the joins in the DSL are also not processed on
> > one single processor node.
> >
> > If you mean the former, the DSL does not have a single method call to
> > join multiple streams and it does not necessarily need it to process
> > an n-way join more efficiently, because the DSL is just the way you
> > declare the join. How the join is processed depends on how the
> > topology is build from the DSL code. Having a DSL call specific for a
> > n-way join would merely result in syntactic sugar (which can also make
> > sense).
> >
> > If you have specific requirements that are not fulfilled by the DSL
> > you can use the Processor API to implement your own join.
> >
> > See the following StackOverflow question for more details on joins.
> >
> >
> https://stackoverflow.com/questions/53485632/kafka-streams-implementing-joining-using-the-processor-api
> >
> > Best,
> > Bruno
> >
> > On Thu, Dec 5, 2019 at 7:08 AM Sachin Mittal  wrote:
> > >
> > > Hi,
> > > I have checked the documentation and what I see that we can join two
> > > streams or tables at a given time.
> > >
> > > I have a case where I have multiple streams which I need to join based
> on
> > > common key.
> > >
> > > As of now I am first joining two and the result of that with next and
> so
> > on.
> > >
> > > Is there a way or any case implemented anywhere that joins multiple
> > > streams/tables in a single operation.
> > >
> > > If not then is this something that is pipelined for future releases?
> > > Or does something like this make sense to be part of streams
> > functionality?
> > >
> > > Thanks
> > > Sachin
> >
>


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Patrik Kleindl
Congratulations John!
Well deserved and thanks for all your help
Best regards 
Patrik

> Am 13.11.2019 um 06:10 schrieb Kamal Chandraprakash 
> :
> 
> Congrats John!
> 
>> On Wed, Nov 13, 2019 at 7:57 AM Dong Lin  wrote:
>> 
>> Congratulations John!
>> 
>>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang  wrote:
>>> 
>>> Hi Everyone,
>>> 
>>> The PMC of Apache Kafka is pleased to announce a new Kafka committer,
>> John
>>> Roesler.
>>> 
>>> John has been contributing to Apache Kafka since early 2018. His main
>>> contributions are primarily around Kafka Streams, but have also included
>>> improving our test coverage beyond Streams as well. Besides his own code
>>> contributions, John has also actively participated on community
>> discussions
>>> and reviews including several other contributors' big proposals like
>>> foreign-key join in Streams (KIP-213). He has also been writing,
>> presenting
>>> and evangelizing Apache Kafka in many venues.
>>> 
>>> Congratulations, John! And look forward to more collaborations with you
>> on
>>> Apache Kafka.
>>> 
>>> 
>>> Guozhang, on behalf of the Apache Kafka PMC
>>> 
>> 


Re: How to start a stream from only new records?

2019-08-13 Thread Patrik Kleindl
Hi

Our requirement is related, we want our streams application to only process
messages from the last x weeks.
On new deployments this requires starting the application first, stopping
the application and then resetting the offsets.
I have created https://issues.apache.org/jira/browse/KAFKA-8766 with the
idea to allow providing a custom offset selection mechanism.
No KIP has been filed yet but maybe it helps to aggregate similar cases.

best regards

Patrik

On Tue, 13 Aug 2019 at 09:26, Matthias J. Sax  wrote:

> You would need to delete committed offsets for the application.id and
> set `auto.offset.reset="latest" to get the behavior you want.
>
>
> -Matthias
>
> On 8/12/19 1:20 AM, Tim Ward wrote:
> > I believe not, because that only causes the application to start reading
> from latest when there is no recorded offset at application start, no?
> >
> > What I need is to be able to specify, by topic, that when the
> application starts it doesn't want to see anything other than new data,
> regardless  of what offset it committed last time it ran.
> >
> > Tim Ward
> >
> > -Original Message-
> > From: Boyang Chen 
> > Sent: 09 August 2019 17:23
> > To: users@kafka.apache.org
> > Subject: Re: How to start a stream from only new records?
> >
> > Hey Tim,
> >
> > if you are talking about avoid re-processing data and start consumption
> > from latest, you could set your `offset.reset.policy` to latest.
> >
> > Let me know if this answers your question.
> >
> > On Fri, Aug 9, 2019 at 7:09 AM Tim Ward 
> wrote:
> >
> >> With a real time application, nobody is interested in old data, and in
> >> particular they're not interested in paying to spend time processing it
> >> only to throw it away, thereby delaying up to date data.
> >>
> >> How do I tell StreamsBuilder.stream() to read only new data from its
> >> topic, and not process any backlog?
> >>
> >> Tim Ward
> >>
> >> This email is from Origami Energy Limited. The contents of this email
> and
> >> any attachment are confidential to the intended recipient(s). If you are
> >> not an intended recipient: (i) do not use, disclose, distribute, copy or
> >> publish this email or its contents; (ii) please contact Origami Energy
> >> Limited immediately; and then (iii) delete this email. For more
> >> information, our privacy policy is available here:
> >> https://origamienergy.com/privacy-policy/. Origami Energy Limited
> >> (company number 8619644) is a company registered in England with its
> >> registered office at Ashcombe Court, Woolsack Way, Godalming, GU7 1LQ.
> >>
> > This email is from Origami Energy Limited. The contents of this email
> and any attachment are confidential to the intended recipient(s). If you
> are not an intended recipient: (i) do not use, disclose, distribute, copy
> or publish this email or its contents; (ii) please contact Origami Energy
> Limited immediately; and then (iii) delete this email. For more
> information, our privacy policy is available here:
> https://origamienergy.com/privacy-policy/. Origami Energy Limited
> (company number 8619644) is a company registered in England with its
> registered office at Ashcombe Court, Woolsack Way, Godalming, GU7 1LQ.
> >
>
>


Re: Kafka Streams - unbounded memory growth - stateful processing (rocksdb)

2019-07-16 Thread Patrik Kleindl
Hello Ashok

Adding to what Sophie wrote, if you use a custom RocksDBConfigSetter then
override the BlockBasedTableConfig like following and call
options.setTableFormatConfig(tableConfig)
at the end.

BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
options.tableFormatConfig();
tableConfig.setBlockCacheSize(100*1024*1024L);
tableConfig.setBlockSize(8*1024L);
options.setTableFormatConfig(tableConfig);

best regards

Patrik


On Tue, 16 Jul 2019 at 23:23, Sophie Blee-Goldman 
wrote:

> Hi Ashok,
>
> 1) RocksDB uses memory in four ways, one of which (iterators) *should* be
> negligible -- however if you have a very large number of them open at any
> one time, they can consume a lot of memory (until they are closed). If you
> are opening many iterators throughout the day, consider closing them more
> frequently than once a day.
>
> 2) The other three users of memory are: index & filter blocks, block cache,
> and memtable (write buffer). You can limit the memory usage of index/filter
> blocks by setting tableConfig.setCacheIndexAndFilterBlocks(true) -- this
> will cause these blocks to be stored in the block cache alongside data
> blocks (and be evicted to disk when full). If you do this I would suggest
> also setting tableConfig.setPinL0FilterAndIndexBlocksInCache(true). You can
> then control the off-heap memory usage by setting the block cache size, and
> write buffer size + write buffer number.
>
> 3) By disabling the RocksDBConfigSetter, you aren't disabling the RocksDB
> cache (or other users of memory), you are reverting to the defaults set by
> Streams (not sure if that's what you expect or not) -- Rocks will continue
> to use off-heap memory
>
> 4) RocksDB objects are backed by C++ objects, so you need to actually close
> some objects you construct to free the memory. Since you construct a
> BloomFilter in your config setter but never close it, you are leaking
> memory. Unfortunately we did not have a RocksDBConfigSetter#close method in
> 1.0.0, but there is one as of 2.3 -- you should either remove the
> BloomFilter from your config setter or consider upgrading to 2.3 (or 2.2,
> which already uses a BloomFilter -- you can just get/update the existing
> BlockBasedTableConfig instead of creating a new one to utilize the
> BloomFilter)
>
> 5) The settings from the config setter (or Streams defaults) are actually
> per rocksdb instance, not per Streams instance. If you have a very large
> number of stores, you may hit OOM even with the relatively conservative
> defaults Streams uses. If you have a large number of stores in your
> subtopology, or a large number of partitions all being read by the same
> instance, the total off-heap memory will be quite large. (If you're able to
> upgrade to 2.3, you can actually use the config setter to limit the total
> memory across all stores rather than on a per-store basis)
>
> If you don't have a large number of stores on an instance, don't open a
> large number of iterators at a time, and still hit OOM over 100GB even with
> the default rocks configs, there may be a memory leak. But I would first
> try setting the configs suggested in 2) above, with a smaller write
> buffer/block cache size (and no bloom filter)
>
> Cheers,
> Sophie
>
> On Tue, Jul 16, 2019 at 12:21 PM Jayaraman, AshokKumar (CCI-Atlanta-CON) <
> ashokkumar.jayara...@cox.com> wrote:
>
> > Hi,
> >
> > In our streaming instance, the internal caching has been disabled and
> > RocksDB caching has been enabled, with the override as shown below.
> > Although the heap is restricted to 36GB, the memory utilization is going
> > over 100GB in a week and eventually runs out of memory.  As part of the
> > profiling, we have confirmed that the garbage collection process is
> within
> > the within the limit 36GB (on-heap).  However, the additional memory
> > utilization is not appearing within the profiling and is the one we
> > suspect, growing unbounded (off-heap).
> >
> > We have also tried enabling the streams caching (5GB) and disabling the
> > RocksDB config setter (commented as below).  However, we are still seeing
> > the similar behaviour where the memory is growing unlimited overtime.
>  We
> > process 20 million records each 20 minutes (a message size - 1KB) on an
> > average.  Can you please review and advise what could cause this
> behavior?
> > We have ensured that the iterators are closed (which happens once a day).
> >
> > //streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> > RocksDBOverride.class)
> >
> > Kafka Broker / Kafka Stream version: 1.0.0
> > Rocks DB: 5.7.3
> >
> > Command:
> > java -Xms12g -Xmx36g -XX:MetaspaceSize=576m -XX:+UseG1GC
> > -XX:ParallelGCThreads=8 -XX:MaxGCPauseMillis=80
> > -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
> > -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -cp
> > /scripts/device_metrics.jar:/libs/kafka/*
> > -Dlog4j.configuration=file:/cfg/device_metrics_log4j.properties
> > org.ssd.devicemetrics /cfg

Re: Streams reprocessing whole topic when deployed but not locally

2019-07-10 Thread Patrik Kleindl
Hi
Regarding the I/O, RocksDB has something called write amplification which 
writes the data to multiple levels internally to enable better optimization at 
the cost of storage and I/O.
This is also the reason the stores can get larger than the topics themselves.
This can be modified by RocksDB settings, the simplest of them being 
compression.
Setting this to lz4 has helped us quite a lot.

Regarding compacted topics, as the compaction does not happen immediately and 
has a rather high threshold by default this can take a while.
This means your application might have to restore a lot more messages than you 
assume.

Hope that helps
Best regards 
Patrik 

> Am 10.07.2019 um 14:22 schrieb Bruno Cadonna :
> 
> Hi Alessandro,
> 
>> - how do I specify the retention period of the data? Just by setting the
>> max retention time for the changelog topic?
> 
> For window and session stores, you can set retention time on a local
> state store by using Materialized.withRetention(...). Consult the
> javadocs for details. If the changelog topic on the broker does not
> yet exist, the retention time of the changelog topic will have the
> same retention time as the local store.
> 
> Additionally, you could try to set the retention time in the streams
> configuration by using `StreamsConfig.TopicPrefix("retention.ms")`.
> 
>> - wouldn't be possible, for example for my LastValueStore to compact the
>> changelog and keep only the last value stored for each key? Because that's
>> all I would need for my use case
> 
> That is what changelogs do. They just keep the last value written.
> 
>> - why is it also writing that much?
> 
> Could it be that after restore of the state stores from the
> changelogs, your stream app just starts working and there is a peek in
> writes because the the app has a large lag initially? Maybe somebody
> else has a better explanation.
> 
> Best,
> Bruno
> 
> On Wed, Jul 10, 2019 at 12:39 AM Alessandro Tagliapietra
>  wrote:
>> 
>> I've just noticed that the store topic created automatically by our streams
>> app have different cleanup.policy.
>> I think that's the main reason I'm seeing that big read/write IO, having a
>> compact policy instead of delete would make the topic much smaller.
>> I'll try that to also see the impact on our storage usage.
>> 
>> --
>> Alessandro Tagliapietra
>> 
>> 
>> On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>> 
>>> Hi Bruno,
>>> 
>>> Oh I see, I'll try to add a persistent disk where the local stores are.
>>> I've other questions then:
>>> - why is it also writing that much?
>>> - how do I specify the retention period of the data? Just by setting the
>>> max retention time for the changelog topic?
>>> - wouldn't be possible, for example for my LastValueStore to compact the
>>> changelog and keep only the last value stored for each key? Because that's
>>> all I would need for my use case
>>> 
>>> Thank you very much for your help
>>> 
 On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna  wrote:
 
 Hi Alessandro,
 
 I am not sure I understand your issue completely. If you start your
 streams app in a new container without any existing local state, then
 it is expected that the changelog topics are read from the beginning
 to restore the local state stores. Am I misunderstanding you?
 
 Best,
 Bruno
 
 On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
  wrote:
> 
> I think I'm having again this issue, this time though it only happens on
> some state stores.
> 
> Here you can find the code and the logs
> https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> We first seen that our confluent cloud bill went up 10x, then seen that
 our
> streams processor was restarted 12 times (kubernetes pod), checking
> confluent cloud usage it seems that the writes/reads went up from the
 usual
> 1-2 KB/s to 12-20 MB/s during app restarts.
> 
> I've then deployed a new version on a new container (no local
 store/state)
> to see what happened:
> - first, it logs everything up to line 460 of the log file in the gist
> - at this point confluent cloud reports high read usage and the
 consumer
> lag starts to increase, the app is accumulating messages behind
> - after a certain point, writes go up as well
> - when streams app transition to RUNNING state, the final aggregation
> function resumes back to where it stopped (without reprocessing old
 data)
> - consumer lag goes back to 0
> 
> What makes me think it's re-reading everything are these lines:
> 
> Resetting offset for partition
> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-04-changelog-2 to offset
> 20202847
> Restoring task 0_2's state store
 KSTREAM-AGGREGATE-STATE-STORE-04
> from beginning of the changelog
> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-04-chan

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-03 Thread Patrik Kleindl
Hi
Try to set it really low like Sophie suggested.
You can verify if the settings take effect by checking the files in the rocksdb 
directories, I think it should be somewhere in OPTIONS or LOG
br, Patrik 

> Am 03.07.2019 um 09:37 schrieb Thameem Ansari :
> 
> Tried setting the open files to 100 and 50 but the results are same. I 
> checked the total open files while the streaming application was busy running 
> just before getting the “too many open files” message it was around 41756 
> which is same as what we have got when we set to -1. 
> 
> VisualVM shows that there is no abnormality with the threads / memory or 
> heap. 
> 
> Thanks
> Thameem
> 
>> On Jul 3, 2019, at 11:50 AM, Sophie Blee-Goldman  wrote:
>> 
>> How sure are you that the open file count never goes beyond 50K? Are those
>> numbers just from a snapshot after it crashed? It's possible rocks is
>> creating a large number of files just for a short period of time (maybe
>> while compacting) that causes the open file count to spike and go back down.
>> 
>> For things to try, you should set the rocks config max.open.files to
>> something less than infinity...if you're OS limit is 1 million and you have
>> (rounding up) ~5k rocks instances, set this to 1 million / 5k = 200. If you
>> set a lower limit and still hit this error, we can go from there
>> 
>> On Tue, Jul 2, 2019 at 11:10 PM emailtokir...@gmail.com <
>> emailtokir...@gmail.com> wrote:
>> 
>>> 
>>> 
>>>> On 2019/07/03 05:46:45, Sophie Blee-Goldman  wrote:
>>>> It sounds like rocksdb *is* honoring your configs -- the max.open.files
>>>> config is an internal restriction that tells rocksdb how many open files
>>> it
>>>> is allowed to have, so if that's set to -1 (infinite) it won't ever try
>>> to
>>>> limit its open files and you may hit the OS limit.
>>>> 
>>>> Think of it this way: if you have 100 rocksdb instances and a OS limit of
>>>> 500, you should set max.open.files to 5  to avoid hitting this limit
>>>> (assuming there are no other open files on the system, in reality you'd
>>>> want some extra room there)
>>>> 
>>>> On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com <
>>>> emailtokir...@gmail.com> wrote:
>>>> 
>>>>> 
>>>>> 
>>>>>> On 2019/06/28 23:29:16, John Roesler  wrote:
>>>>>> Hey all,
>>>>>> 
>>>>>> If you want to figure it out theoretically, if you print out the
>>>>>> topology description, you'll have some number of state stores listed
>>>>>> in there. The number of Rocks instances should just be
>>>>>> (#global_state_stores +
>>>>>> sum(#partitions_of_topic_per_local_state_store)) . The number of
>>>>>> stream threads isn't relevant here.
>>>>>> 
>>>>>> You can also figure it out empirically: the first level of
>>>>>> subdirectories in the state dir are Tasks, and then within that, the
>>>>>> next level is Stores. You should see the store directory names match
>>>>>> up with the stores listed in the topology description. The number of
>>>>>> Store directories is exactly the number of RocksDB instances you
>>> have.
>>>>>> 
>>>>>> There are also metrics corresponding to each of the state stores, so
>>>>>> you can compute it from what you find in the metrics.
>>>>>> 
>>>>>> Hope that helps,
>>>>>> -john
>>>>>> 
>>>>>> On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl 
>>>>> wrote:
>>>>>>> 
>>>>>>> Hi Kiran
>>>>>>> Without much research my guess would be "num_stream_threads *
>>>>>>> (#global_state_stores +
>>>>> sum(#partitions_of_topic_per_local_state_store))"
>>>>>>> So 10 stores (regardless if explicitly defined or implicitely
>>> because
>>>>> of
>>>>>>> some stateful operation) with 10 partitions each should result in
>>> 100
>>>>>>> Rocksdb instances if you are running at the default of
>>>>> num_stream_threads=1.
>>>>>>> 
>>>>>>> As I wrote before, start with 100.
>>>>>>> If the error persists, half the number, if not, double it ;-)
>>&g

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-27 Thread Patrik Kleindl
Hi Kiran
Without much research my guess would be "num_stream_threads *
(#global_state_stores + sum(#partitions_of_topic_per_local_state_store))"
So 10 stores (regardless if explicitly defined or implicitely because of
some stateful operation) with 10 partitions each should result in 100
Rocksdb instances if you are running at the default of num_stream_threads=1.

As I wrote before, start with 100.
If the error persists, half the number, if not, double it ;-) Repeat as
needed.

If you reach the single-digit-range and the error still shows up, start
searching for any iterators over a store you might not have closed.

br, Patrik

On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:

>
>
> On 2019/06/27 09:02:39, Patrik Kleindl  wrote:
> > Hello Kiran
> >
> > First, the value for maxOpenFiles is per RocksDB instance, and the number
> > of those can get high if you have a lot of topic partitions etc.
> > Check the directory (state dir) to see how many there are.
> > Start with a low value (100) and see if that has some effect.
> >
> > Second, because I just found out, you should use
> > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > options.tableFormatConfig();
> > tableConfig.setBlockCacheSize(100*1024*1024L);
> > tableConfig.setBlockSize(8*1024L);
> > instead of creating a new object to prevent accidently messing up
> > references.
> >
> > Hope that helps
> > best regards
> > Patrik
> >
> > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
> > emailtokir...@gmail.com> wrote:
> >
> > >
> > >
> > > On 2019/06/26 21:58:02, Patrik Kleindl  wrote:
> > > > Hi Kiran
> > > > You can use the RocksDBConfigSetter and pass
> > > >
> > > > options.setMaxOpenFiles(100);
> > > >
> > > > to all RocksDBs for the Streams application which limits how many are
> > > > kept open at the same time.
> > > >
> > > > best regards
> > > >
> > > > Patrik
> > > >
> > > >
> > > > On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
> > > > emailtokir...@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > We are using Kafka streams DSL APIs for doing some counter
> aggregations
> > > > > (running on OpenJDK 11.0.2). Our topology has some 400 sub
> topologies
> > > & we
> > > > > are using 8 partitions in source topic. When we start pumping more
> > > load, we
> > > > > start getting RockDBException stating "too many open files".
> > > > >
> > > > > Here are the stack trace samples:
> > > > >
> > > > >
> > >
> --
> > > > > Caused by: org.rocksdb.RocksDBException: while open a file for
> lock:
> > > > > PPP.151200/LOCK: Too many open files
> > > > > at org.rocksdb.RocksDB.open(Native Method)
> > > > > at org.rocksdb.RocksDB.open(RocksDB.java:235)
> > > > > at
> > > > >
> > >
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156)
> > > > > ... 24 common frames omitted
> > > > >
> > > > >
> > > > > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> > > Error
> > > > > while executing flush from store XXX.151200
> > > > > at
> > > > >
> > >
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:397)
> > > > > at
> > > > >
> > >
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:388)
> > > > > at
> > > > >
> > >
> org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:163)
> > > > > at
> > > > >
> > >
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:178)
> > > > > at
> > > > >
> > >
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> > > > > at
> > > > >
> > >
> org.a

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-27 Thread Patrik Kleindl
Hello Kiran

First, the value for maxOpenFiles is per RocksDB instance, and the number
of those can get high if you have a lot of topic partitions etc.
Check the directory (state dir) to see how many there are.
Start with a low value (100) and see if that has some effect.

Second, because I just found out, you should use
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
options.tableFormatConfig();
tableConfig.setBlockCacheSize(100*1024*1024L);
tableConfig.setBlockSize(8*1024L);
instead of creating a new object to prevent accidently messing up
references.

Hope that helps
best regards
Patrik

On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:

>
>
> On 2019/06/26 21:58:02, Patrik Kleindl  wrote:
> > Hi Kiran
> > You can use the RocksDBConfigSetter and pass
> >
> > options.setMaxOpenFiles(100);
> >
> > to all RocksDBs for the Streams application which limits how many are
> > kept open at the same time.
> >
> > best regards
> >
> > Patrik
> >
> >
> > On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
> > emailtokir...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > We are using Kafka streams DSL APIs for doing some counter aggregations
> > > (running on OpenJDK 11.0.2). Our topology has some 400 sub topologies
> & we
> > > are using 8 partitions in source topic. When we start pumping more
> load, we
> > > start getting RockDBException stating "too many open files".
> > >
> > > Here are the stack trace samples:
> > >
> > >
> --
> > > Caused by: org.rocksdb.RocksDBException: while open a file for lock:
> > > PPP.151200/LOCK: Too many open files
> > > at org.rocksdb.RocksDB.open(Native Method)
> > > at org.rocksdb.RocksDB.open(RocksDB.java:235)
> > > at
> > >
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156)
> > > ... 24 common frames omitted
> > >
> > >
> > > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> Error
> > > while executing flush from store XXX.151200
> > > at
> > >
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:397)
> > > at
> > >
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:388)
> > > at
> > >
> org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:163)
> > > at
> > >
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:178)
> > > at
> > >
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> > > at
> > >
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> > > at
> > >
> org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130)
> > > at
> > >
> org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177)
> > > at
> > >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217)
> > > ... 10 more
> > > Caused by: org.rocksdb.RocksDBException: While open a file for
> appending:
> > > Y.151200/07.dbtmp: Too many open files
> > > at org.rocksdb.RocksDB.flush(Native Method)
> > > at org.rocksdb.RocksDB.flush(RocksDB.java:3401)
> > > at org.rocksdb.RocksDB.flush(RocksDB.java:3361)
> > > at
> > >
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:395)
> > >
> > >
> --
> > >
> > > We tried increasing the open files limit at OS level to some decent
> > > number.. but still no luck. Obviously we don't want to have boundless
> open
> > > files..
> > >
> > > We also tried to play with commit interval(kafka.commit.interval.ms)
> and
> > > cache size (kafka.cache.max.bytes.buffering) .. but no luck there
> either.
> > >
> > > KAFKA-3904 talks about it.. but it was resolved long back..
> > >
> > >

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-26 Thread Patrik Kleindl
Hi Kiran
You can use the RocksDBConfigSetter and pass

options.setMaxOpenFiles(100);

to all RocksDBs for the Streams application which limits how many are
kept open at the same time.

best regards

Patrik


On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:

> Hi,
>
> We are using Kafka streams DSL APIs for doing some counter aggregations
> (running on OpenJDK 11.0.2). Our topology has some 400 sub topologies & we
> are using 8 partitions in source topic. When we start pumping more load, we
> start getting RockDBException stating "too many open files".
>
> Here are the stack trace samples:
>
> --
> Caused by: org.rocksdb.RocksDBException: while open a file for lock:
> PPP.151200/LOCK: Too many open files
> at org.rocksdb.RocksDB.open(Native Method)
> at org.rocksdb.RocksDB.open(RocksDB.java:235)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156)
> ... 24 common frames omitted
>
>
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
> while executing flush from store XXX.151200
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:397)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:388)
> at
> org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:163)
> at
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:178)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> at
> org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130)
> at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217)
> ... 10 more
> Caused by: org.rocksdb.RocksDBException: While open a file for appending:
> Y.151200/07.dbtmp: Too many open files
> at org.rocksdb.RocksDB.flush(Native Method)
> at org.rocksdb.RocksDB.flush(RocksDB.java:3401)
> at org.rocksdb.RocksDB.flush(RocksDB.java:3361)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:395)
>
> --
>
> We tried increasing the open files limit at OS level to some decent
> number.. but still no luck. Obviously we don't want to have boundless open
> files..
>
> We also tried to play with commit interval(kafka.commit.interval.ms) and
> cache size (kafka.cache.max.bytes.buffering) .. but no luck there either.
>
> KAFKA-3904 talks about it.. but it was resolved long back..
>
> Any other config tuning we have to do?
>
> Appreciate any help in this regard!
>
> Thanks,
> Kiran
>
>


Re: Offsets of deleted consumer groups do not get deleted correctly

2019-04-01 Thread Patrik Kleindl
Hi Claudia
Just a sidenote, there is a combined policy for "compact, delete" which
deletes messages older than retention.ms and compacts newer ones if I
remember correctly.
It's still not really in the docs as it seems
https://kafka.apache.org/documentation/#topicconfigs
best regards
Patrik

On Mon, 1 Apr 2019 at 10:02, Claudia Wegmann  wrote:

> Hi,
>
> thanks for your reply.
>
> Also the groups were deleted month ago, there are still valid values
> there. So I guess deleting the group did not produce the tombstone record
> correctly. Your explanation made it clearer for me. I know I should keep
> cleanup policy compact in general. I decided to switch to policy for just
> long enough, so that these old records get deleted. I guess I could have
> produced the tombstone recods to be on the safe side.
>
> Thanks,
> Claudia
>
> -Ursprüngliche Nachricht-
> Von: Vincent Maurin 
> Gesendet: Freitag, 29. März 2019 15:24
> An: users@kafka.apache.org
> Betreff: Re: Offsets of deleted consumer groups do not get deleted
> correctly
>
> Hi,
>
> You should keep the policy compact for the topic __consumer_offsets This
> topic stores for each group/topic/partition the offset consumed. As only
> the latest message for a group/topic/partition is relevant, the policy
> compact will keep only this message. When you delete a group, actually it
> will produce a tombstone to this topic (i.e body NULL). Then when the log
> compaction is running, it will definitively remove the tombstone.
> But to have an effective delete of the tombstones, keep in mind :
> * compaction runs only on rolled out segments
> * deletion of tombstone only occurs if the delete.retention.ms delay is
> expired
>
> Best regards
>
> On Fri, Mar 29, 2019 at 2:16 PM Claudia Wegmann 
> wrote:
>
> > Hey there,
> >
> > I've got the problem that the "__consumer_offsets" topic grows pretty
> > big over time. After some digging, I found offsets for consumer groups
> > that were deleted a long time ago still being present in the topic.
> > Many of them are offsets for console consumers, that have been deleted
> > with "kafka-consumer-groups.sh --delete --group ...".
> >
> > As far as I understand log cleaning, those offsets should have been
> > deleted a long time ago, because these consumers are no longer active.
> > When I query "kafka-consumer-groups.sh --bootstrap-server ...  --list"
> > I don't see those consumers either.
> >
> > Is there a bug in "kafka-consumer-groups.sh --delete --group ..." that
> > let's kafka hang on to those consumer groups?
> >
> > How can I get the log cleaner to delete these old offsets? Is there
> > another way than setting "cleanup.policy" to "delete"?
> >
> > Thanks for our help!
> >
> > Best,
> > Claudia
> >
>


Re: KafkaStreams backoff for non-existing topic

2019-03-25 Thread Patrik Kleindl
Hi Guozhang
Just a small question, why can't this be checked when trying to instantiate
KafkaStreams?
The Topology should know all topics and the existence of the topics could
be verified with the AdminClient.
This would allow to fail fast similar to when the state directory is not
available.
Or am I missing something?
best regards
Patrik

On Mon, 25 Mar 2019 at 23:15, Guozhang Wang  wrote:

> Hello Murilo,
>
> Just to give some more background to John's message and KAFKA-7970 here.
> The main reason of trickiness is around the scenario of "topics being
> partially available", e.g. say your application is joining to topics A and
> B, while topicA exists but topicB does not (it is surprisingly common due
> to either human errors, or topic creation race conditions, etc). Then you
> have a few options at hand:
>
> 1. Just start the app normally, which will only process data from topicA
> and none from topicB. When topicB is created later the app will
> auto-rebalance to get the data (this is guaranteed by Streams itself).
> However before this is true the join operator would see no data from topicB
> to join to while proceeding. This behavior is actually the case before
> Kafka version 2.0 and many users complained about it.
>
> 2. Does not start the app at all, notify the users that some topics are
> missing and stop. This is what we changed in KAFKA-5037.
>
> 3. We can also, let app to stay up and running, but does not process any
> data at all until all topics subscribed become available, etc etc.
>
>
> Now depending on user's motivation cases and preferences, they may prefer
> any of these options. The reason we chose to do 2) is to give users some
> control upon the event and do their wrapping logic on top of it (e.g. like
> John suggested). Hope this helps some clarifications for you.
>
>
> Guozhang
>
>
> On Mon, Mar 25, 2019 at 12:23 PM John Roesler  wrote:
>
> > Hi, Murlio,
> >
> > I found https://issues.apache.org/jira/browse/KAFKA-7970, which sounds
> > like
> > the answer is currently "yes". Unfortunately, it is still tricky to
> handle
> > this case, although the situation may improve soon.
> >
> > In the mean time, you can try to work around it with the StateListener.
> > When Streams has a successful start-up, you'll see it transition from
> > REBALANCING to RUNNING, so if you see it transition to PENDING_SHUTDOWN,
> > NOT_RUNNING, or ERROR before you see "oldState: REBALANCING && newState:
> > RUNNING", you know that Streams did not have a successful startup. It
> > sounds like you can't determine programmatically *why* this happened, but
> > you can log a warning or error and then create a new the KafkaStreams
> > object and try starting it again.
> >
> > I hope this helps, and feel free to comment on that ticket to add your
> own
> > perspective to the issue!
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 22, 2019 at 3:25 PM Murilo Tavares 
> > wrote:
> >
> > > Hi
> > > After some research, I've come to a few discussions, and they all tell
> me
> > > that Kafka Streams require the topics to be created before starting the
> > > application.
> > > Nevertheless, I'd like my application to keep retrying if a topic does
> > not
> > > exist.
> > > I've seen this thread:
> > > https://groups.google.com/forum/#!topic/confluent-platform/nmfrnAKCM3c
> ,
> > > which is pretty old, and I'd like to know if it's still hard to catch
> > that
> > > Exception in my app.
> > >
> > > Thanks
> > > Murilo
> > >
> >
>
>
> --
> -- Guozhang
>


Re: No checkpoint found

2019-03-20 Thread Patrik Kleindl
Hi Claudia
Probably https://issues.apache.org/jira/browse/KAFKA-5998, welcome to the
club ;-)
best regards
Patrik

On Wed, 20 Mar 2019 at 10:25, Claudia Wegmann  wrote:

> Hi kafka users,
>
> since upgrading to kafka 2.1.1 version I get the following log message at
> every startup of streaming services:
> "No checkpoint found for task 0_16 state store TestStore changelog
> test-service-TestStore-changelog-16 with EOS turned on. Reinitializing the
> task and restore its state from the beginning".
>
> Is there any configuration that I have to set so that checkpoints are
> created?
>
> Thanks in advance.
>
> Best,
> Claudi
>


Re: Operationalizing Zookeeper and common gotchas

2019-03-18 Thread Patrik Kleindl
Hi Eno
Thanks too, this is indeed helpful
Best regards 
Patrik 

> Am 18.03.2019 um 18:16 schrieb Eno Thereska :
> 
> Hi folks,
> 
> The team here has come up with a couple of clarifying tips for
> operationalizing Zookeeper for Kafka that we found missing from the
> official documentation, and passed them along to share. If you find them
> useful, I'm thinking of putting on
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ. Meanwhile any
> feedback is appreciated.
> 
> ---
> Operationalizing Zookeeper FAQ
> 
> The discussion below uses a 3-instance Zookeeper cluster as an example. The
> findings apply to a larger cluster as well, but you’ll need to adjust the
> numbers.
> 
> - Does it make sense to have a config with only 2 Zookeeper instances?
> I.e., in zookeeper.properties file have two entries for server 1 and server
> 2 only. A: No. A setup with 2 Zookeeper instances is not fault tolerant to
> even 1 failure. If one of the Zookeeper instances fails, the remaining one
> will not be functional since there is no quorum majority (1 out of 2 is not
> majority). If you do a “stat” command on that remaining instance you’ll see
> the output being “This ZooKeeper instance is not currently serving
> requests”.
> 
> - What if you end up with only 2 running Zookeeper instances, e.g., you
> started with 3 but one failed? Isn’t that the same as the case above? A: No
> it’s not the same scenario. First of all, the 3- instance setup did
> tolerate 1 instance down. The 2 remaining Zookeeper instances will continue
> to function because the quorum majority (2 out of 3) is there.
> 
> - I had a 3 Zookeeper instance setup and one instance just failed. How
> should I recover? A: Restart the failed instance with the same
> configuration it had before (i.e., same “myid” ID file, and same IP
> address). It is not important to recover the data volume of the failed
> instance, but it is a bonus if you do so. Once the instance comes up, it
> will sync with the other 2 Zookeeper instances and get all the data.
> 
> - I had a 3 Zookeeper instance setup and two instances failed. How should I
> recover? Is my Zookeeper cluster even running at that point? A: First of
> all, ZooKeeper is now unavailable and the remaining instance will show
> “This ZooKeeper instance is not currently serving requests” if probed.
> Second, you should make sure this situation is extremely rare. It should be
> possible to recover the first failed instance quickly before the second
> instance fails. Third, bring up the two failed instances one by one without
> changing anything in their config. Similarly to the case above, it is not
> important to recover the data volume of the failed instance, but it is a
> bonus if you do so. Once the instance comes up, it will sync with the other
> 1 ZooKeeper instance and get all the data.
> 
> - I had a 3 Zookeeper instance setup and two instances failed. I can’t
> recover the failed instances for whatever reason. What should I do? A: You
> will have to restart the remaining healthy ZooKeeper in “standalone” mode
> and restart all the brokers and point them to this standalone zookeeper
> (instead of all 3 ZooKeepers).
> 
> - The Zookeeper cluster is unavailable (for any of the reasons mentioned
> above, e.g., no quorum, all instances have failed). What is the impact on
> Kafka clients? What is the impact on brokers? A: The Zookeeper cluster is
> unavailable (for any of the reasons mentioned above, e.g., no quorum, all
> instances have failed). What is the impact on Kafka applications
> producing/consuming? What is the impact on admin tools to manage topics and
> cluster? What is the impact on brokers? A: Applications will be able to
> continue producing and consuming, at least for a while. This is true if the
> ZooKeeper cluster is temporarily unavailable but eventually becomes
> available (after a few mins). On the other hand, if the ZooKeeper cluster
> is permanently unavailable, then applications will slowly start to see
> problems with producing/consuming especially if some brokers fail, because
> the partition leaders will not be distributed to other brokers. So taking
> one extreme, if the ZooKeeper cluster is down for a month, it is very
> likely that applications will get produce/consume errors. Admin tools
> (e.g., that create topics, set ACLs or change configs) will not work.
> Brokers will not be impacted from Zookeeper being unavailable. They will
> periodically try to reconnect to the ZooKeeper cluster. If you take care to
> use the same IP address for a recovered Zookeeper instance as it had before
> it failed, brokers will not need to be restarted.
> --
> 
> Cheers,
> Eno


Re: Minimizing global store restoration time

2019-03-01 Thread Patrik Kleindl
Hi Guozhang

I have created https://issues.apache.org/jira/browse/KAFKA-8023 and by
accident found https://issues.apache.org/jira/browse/KAFKA-6721 which was
what I was looking for at the beginning.
Does this need a KIP?
I can maybe help with the writeup but I am not sure I should help with the
code ;-)

6721 might indirectly cover point 1) from above as currently (if I
understand Taylor correctly) it seems a bit inconsistent that the normal
Tables have split config for processing and restore while in the global
case both are shared, although it is understandably just a result of using
only one consumer for the global state stores.

best regards
Patrik

On Thu, 28 Feb 2019 at 23:46, Guozhang Wang  wrote:

> Hi Taylor,
>
> 1) Yes we do allow users to have separate config values for global
> consumers / restore consumers via StreamsConfig#restoreConsumerPrefix and
> StreamsConfig#globalConsumerPrefix, as Patrik pointed out.
>
> 2) I think I agree with you that for global consumer, it is worth while to
> allow one than one update thread (for restore consumer though we have the
> same stream thread for it by design, so that is much harder to
> re-architecture). Would you mind creating a JIRA ticket for it so we do not
> forget about this potential improvement?
>
> Guozhang
>
> On Wed, Feb 27, 2019 at 2:02 PM Taylor P  wrote:
>
> > Hi Guozhang, Patrik,
> >
> > Yes, the global consumer setting is what needs to be changed for these
> > settings. The restore consumer configs aren't used at all since a
> separate
> > restore consumer is not initialized for global state store restoration -
> > the global consumer is used. I think it would be an improvement to allow
> > for using different configs for the global consumer between restoration
> and
> > regular processing.
> >
> > I previously tried tweaking fetch.max.bytes and receive.buffer.bytes, but
> > if I recall correctly, I was still capped around 100K records/sec. I will
> > try tweaking them again when I get time.
> >
> > Is there anything major that would prevent parallelizing the restoration
> of
> > each partition of the global state store? It looks like that would be a
> > decent chunk of work to refactor, but I think that would have the biggest
> > impact in reducing global state restoration times for the scenario where
> > the keyspace of the global state store is very large.
> >
> > Taylor
> >
> >
> > On Thu, Feb 21, 2019 at 6:31 AM Patrik Kleindl 
> wrote:
> >
> > > Hello Guozhang,
> > >
> > > thanks, that might help us too.
> > > Just to confirm, this depends on KTable/GlobalKTable usage, right?
> > > I did a test with
> > >
> > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
> > > 65536);
> > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
> > > 52428800);
> > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
> > > 65536);
> > >
> > >
> >
> streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
> > > 52428800);
> > >
> > > and only if I change the globalConsumerPrefix values I see a change for
> > the
> > > GlobalKTable restoration.
> > >
> > > br, Patrik
> > >
> > > PS: Logs from the test, seems to work fine and get faster/slower
> > depending
> > > on the change:
> > >
> > > Default Values:
> > >
> > > 743
> > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.Metadata - Cluster ID:
> > s2467KdmTlKV5b2YGe831g
> > > 936
> > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> > > groupId=] Resetting offset for partition topic-0 to offset 2480157.
> > > 13378
> > >
> > >
> >
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> > >
> > >
> >
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global

Re: [VOTE] 2.2.0 RC0

2019-02-25 Thread Patrik Kleindl
Hi Matthias
Minor issue, if locale is not english (german in my case) then
org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED
org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB>
at org.junit.Assert.assertEquals(Assert.java:115)
at org.junit.Assert.assertEquals(Assert.java:144)
at
org.apache.kafka.common.utils.UtilsTest.testFormatBytes(UtilsTest.java:106)
fails.
Possible fix clients/src/main/java/org/apache/kafka/common/utils/Utils.java:

DecimalFormat TWO_DIGIT_FORMAT = ((DecimalFormat)
NumberFormat.getInstance(Locale.ENGLISH));
TWO_DIGIT_FORMAT.applyPattern("0.##");

or fix the expectation of the test based on the locale.
If english locale is expected to build I might have missed it.

br, Patrik

On Sun, 24 Feb 2019 at 00:57, Matthias J. Sax  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for the release of Apache Kafka 2.2.0.
>
> This is a minor release with the follow highlight:
>
>  - Added SSL support for custom principle name
>  - Allow SASL connections to periodically re-authenticate
>  - Improved consumer group management
>- default group.id is `null` instead of empty string
>  - Add --under-min-isr option to describe topics command
>  - Allow clients to suppress auto-topic-creation
>  - API improvement
>- Producer: introduce close(Duration)
>- AdminClient: introduce close(Duration)
>- Kafka Streams: new flatTransform() operator in Streams DSL
>- KafkaStreams (and other classed) now implement AutoClosable to
> support try-with-resource
>- New Serdes and default method implementations
>  - Kafka Streams exposed internal client.id via ThreadMetadata
>  - Metric improvements:  All `-min`, `-avg` and `-max` metrics will now
> output `NaN` as default value
>
>
> Release notes for the 2.2.0 release:
> http://home.apache.org/~mjsax/kafka-2.2.0-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Friday, March 1, 9am PST.
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~mjsax/kafka-2.2.0-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> http://home.apache.org/~mjsax/kafka-2.2.0-rc0/javadoc/
>
> * Tag to be voted upon (off 2.2 branch) is the 2.2.0 tag:
> https://github.com/apache/kafka/releases/tag/2.2.0-rc0
>
> * Documentation:
> https://kafka.apache.org/22/documentation.html
>
> * Protocol:
> https://kafka.apache.org/22/protocol.html
>
> * Successful Jenkins builds for the 2.2 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-2.2-jdk8/31/
>
> * System tests:
> https://jenkins.confluent.io/job/system-test-kafka/job/2.2/
>
>
>
>
> Thanks,
>
> -Matthias
>
>


Re: Minimizing global store restoration time

2019-02-21 Thread Patrik Kleindl
 org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
groupId=] Resetting offset for partition topic-10 to offset 2430617.

On Wed, 20 Feb 2019 at 19:16, Guozhang Wang  wrote:

> Hello Taylor,
>
> Sorry for the late reply! And thanks for the updated information.
>
> I'd recommend overriding some consumer configs via `StreamsConfig` (you can
> use the StreamsConfig#restoreConsumerPrefix for that) for the following
> props:
>
> 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return early than
> necessary)
> 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size of your
> maximum 2000 records to possibly exceed it (default is 50Mb).
>
>
> Guozhang
>
>
>
> On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl  wrote:
>
> > Hi Taylor
> > You are right, the parallel processing is not mentioned in this issue, if
> > I remember correctly it was in the thread that lead to it as a
> possibility
> > when changing to the restoration listeners.
> > Best regards
> > Patrik
> >
> > > Am 07.02.2019 um 00:47 schrieb Taylor P :
> > >
> > > Hi Patrik,
> > >
> > > I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380
> will
> > > resolve this issue since our application is dependent on the global
> store
> > > being fully restored before the application can be considered healthy.
> It
> > > does not seem like KAFKA-7380 is aiming to address the nature of global
> > > stores restoring each partition sequentially - it is aiming to change
> the
> > > blocking nature of #start(). Restoring the global store partitions in
> > > parallel would definitely speed things up, though, and admittedly my
> > first
> > > thought when debugging this was "why isn't this restoring each
> partition
> > in
> > > parallel?".
> > >
> > > Changing our streams topology to avoid using a global store for such a
> > > large amount of data would be doable but it does seem like a
> significant
> > > amount of work. I am curious to know if anyone else is storing large
> > > amounts of data in global stores and whether there are any inherent
> > > limitations to the size of global stores.
> > >
> > > Our topic is already using compaction.
> > >
> > > Taylor
> > >
> > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl 
> > wrote:
> > >>
> > >> Hi Taylor
> > >>
> > >> We are facing the same issue, although on a smaller scale.
> > >> The main problem as you found is that the restoration is running
> > >> sequentially, this should be addressed in
> > >> https://issues.apache.org/jira/browse/KAFKA-7380, although there has
> > been
> > >> no progress lately.
> > >>
> > >> On the other hand you could try re-evaluate if your problem can only
> be
> > >> solved with global state stores, in our case (both in streams as well
> as
> > >> for interactive queries) we could solve it with local state stores
> too,
> > >> although only with more changes and more complexity in the topology.
> > >>
> > >> Not sure if it is applicable for your case, but have you looked into
> > >> compression for the topics?
> > >>
> > >> best regards
> > >>
> > >> Patrik
> > >>
> > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P  wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> I am having issues with the global store taking a very long time to
> > >> restore
> > >>> during startup of a Kafka Streams 2.0.1 application. The global store
> > is
> > >>> backed by a RocksDB persistent store and is added to the Streams
> > topology
> > >>> in the following manner: https://pastebin.com/raw/VJutDyYe The
> global
> > >>> store
> > >>> topic has approximately 15 million records per partition and 18
> > >> partitions.
> > >>> The following global consumer settings are specified:
> > >>>
> > >>>poll.timeout.ms = 10
> > >>>max.poll.records = 2000
> > >>>max.partition.fetch.bytes = 1048576
> > >>>fetch.max.bytes = 52428800
> > >>>receive.buffer.bytes = 65536
> > >>>
> > >>> I have tried tweaking the settings above on the consumer side, such
> as
> > >>

Re: Accessing Kafka stream's KTable underlying RocksDB memory usage

2019-02-17 Thread Patrik Kleindl
Hi
How many partitions do your topics have?
As far as I understand there is a RocksDB for every partition of every KTable 
and this can add up quickly.
Depending on how many instances you are using one of them might have to handle 
the complete load temporarily which will use more memory.
Also, RocksDB memory is outside the jvm, so harder to monitor as the metrics 
are not exposed yet through Kafka streams.
Try to limit the size of the buffer cache for the RocksDB with the 
CustomRocksConfigSetter.

Best regards

Patrik

> Am 15.02.2019 um 06:24 schrieb P. won :
> 
> Hi,
> 
> I have a kafka stream app that currently takes 3 topics and aggregates
> them into a KTable. This app resides inside a microservice which has
> been allocated 512 MB memory to work with. After implementing this,
> I've noticed that the docker container running the microservice
> eventually runs out of memory and was trying to debug the cause.
> 
> My current theory (whilst reading the sizing guide
> https://docs.confluent.io/current/streams/sizing.html) is that over
> time, the increasing records stored in the KTable and by extension,
> the underlying RocksDB, is causing the OOM for the microservice. Does
> kafka provide any way to find out the memory used by the underlying
> default RocksDB implementation?


Re: Minimizing global store restoration time

2019-02-08 Thread Patrik Kleindl
Hi Taylor
You are right, the parallel processing is not mentioned in this issue, if I 
remember correctly it was in the thread that lead to it as a possibility when 
changing to the restoration listeners.
Best regards
Patrik 

> Am 07.02.2019 um 00:47 schrieb Taylor P :
> 
> Hi Patrik,
> 
> I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380 will
> resolve this issue since our application is dependent on the global store
> being fully restored before the application can be considered healthy. It
> does not seem like KAFKA-7380 is aiming to address the nature of global
> stores restoring each partition sequentially - it is aiming to change the
> blocking nature of #start(). Restoring the global store partitions in
> parallel would definitely speed things up, though, and admittedly my first
> thought when debugging this was "why isn't this restoring each partition in
> parallel?".
> 
> Changing our streams topology to avoid using a global store for such a
> large amount of data would be doable but it does seem like a significant
> amount of work. I am curious to know if anyone else is storing large
> amounts of data in global stores and whether there are any inherent
> limitations to the size of global stores.
> 
> Our topic is already using compaction.
> 
> Taylor
> 
>> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl  wrote:
>> 
>> Hi Taylor
>> 
>> We are facing the same issue, although on a smaller scale.
>> The main problem as you found is that the restoration is running
>> sequentially, this should be addressed in
>> https://issues.apache.org/jira/browse/KAFKA-7380, although there has been
>> no progress lately.
>> 
>> On the other hand you could try re-evaluate if your problem can only be
>> solved with global state stores, in our case (both in streams as well as
>> for interactive queries) we could solve it with local state stores too,
>> although only with more changes and more complexity in the topology.
>> 
>> Not sure if it is applicable for your case, but have you looked into
>> compression for the topics?
>> 
>> best regards
>> 
>> Patrik
>> 
>>> On Tue, 5 Feb 2019 at 22:37, Taylor P  wrote:
>>> 
>>> Hi,
>>> 
>>> I am having issues with the global store taking a very long time to
>> restore
>>> during startup of a Kafka Streams 2.0.1 application. The global store is
>>> backed by a RocksDB persistent store and is added to the Streams topology
>>> in the following manner: https://pastebin.com/raw/VJutDyYe The global
>>> store
>>> topic has approximately 15 million records per partition and 18
>> partitions.
>>> The following global consumer settings are specified:
>>> 
>>>poll.timeout.ms = 10
>>>max.poll.records = 2000
>>>max.partition.fetch.bytes = 1048576
>>>fetch.max.bytes = 52428800
>>>receive.buffer.bytes = 65536
>>> 
>>> I have tried tweaking the settings above on the consumer side, such as
>>> increasing poll.timeout.ms to 2000, max.poll.records to 1, and
>>> max.partition.fetch.bytes to 52428800, but it seems that I keep hitting a
>>> ceiling of restoring approximately 100,000 records per second. With 15
>>> million records per partition, it takes approximately 150 seconds to
>>> restore a single partition. With 18 partitions, it takes roughly 45
>> minutes
>>> to fully restore the global store.
>>> 
>>> Switching from HDDs to SSDs on the brokers' log directories made
>>> restoration roughly 25% faster overall, but this still feels slow. It
>> seems
>>> that I am hitting IOPS limits on the disks and am not even close to
>> hitting
>>> the throughput limits of the disks on either the broker or streams
>>> application side.
>>> 
>>> How can I minimize restoration time of a global store? Are there settings
>>> that can increase throughput with the same number of IOPS? Ideally
>>> restoration of each partition could be done in parallel but I recognize
>>> there is only a single global store thread. Bringing up a new instance of
>>> the Kafka Streams application occurs on a potentially daily basis, so the
>>> restoration time is becoming more and more of a hassle.
>>> 
>>> Thanks.
>>> 
>>> Taylor
>>> 
>> 


Re: Minimizing global store restoration time

2019-02-06 Thread Patrik Kleindl
Hi Taylor

We are facing the same issue, although on a smaller scale.
The main problem as you found is that the restoration is running
sequentially, this should be addressed in
https://issues.apache.org/jira/browse/KAFKA-7380, although there has been
no progress lately.

On the other hand you could try re-evaluate if your problem can only be
solved with global state stores, in our case (both in streams as well as
for interactive queries) we could solve it with local state stores too,
although only with more changes and more complexity in the topology.

Not sure if it is applicable for your case, but have you looked into
compression for the topics?

best regards

Patrik

On Tue, 5 Feb 2019 at 22:37, Taylor P  wrote:

> Hi,
>
> I am having issues with the global store taking a very long time to restore
> during startup of a Kafka Streams 2.0.1 application. The global store is
> backed by a RocksDB persistent store and is added to the Streams topology
> in the following manner: https://pastebin.com/raw/VJutDyYe The global
> store
> topic has approximately 15 million records per partition and 18 partitions.
> The following global consumer settings are specified:
>
> poll.timeout.ms = 10
> max.poll.records = 2000
> max.partition.fetch.bytes = 1048576
> fetch.max.bytes = 52428800
> receive.buffer.bytes = 65536
>
> I have tried tweaking the settings above on the consumer side, such as
> increasing poll.timeout.ms to 2000, max.poll.records to 1, and
> max.partition.fetch.bytes to 52428800, but it seems that I keep hitting a
> ceiling of restoring approximately 100,000 records per second. With 15
> million records per partition, it takes approximately 150 seconds to
> restore a single partition. With 18 partitions, it takes roughly 45 minutes
> to fully restore the global store.
>
> Switching from HDDs to SSDs on the brokers' log directories made
> restoration roughly 25% faster overall, but this still feels slow. It seems
> that I am hitting IOPS limits on the disks and am not even close to hitting
> the throughput limits of the disks on either the broker or streams
> application side.
>
> How can I minimize restoration time of a global store? Are there settings
> that can increase throughput with the same number of IOPS? Ideally
> restoration of each partition could be done in parallel but I recognize
> there is only a single global store thread. Bringing up a new instance of
> the Kafka Streams application occurs on a potentially daily basis, so the
> restoration time is becoming more and more of a hassle.
>
> Thanks.
>
> Taylor
>


Re: Warning when adding GlobalKTable to toplogy

2019-01-19 Thread Patrik Kleindl
Hi
That is because the global tables are handled separately by the 
GlobalStreamThread as far as I understand.
You also don‘t see their offsets like for regular consumers.
Best regards
Patrik

> Am 19.01.2019 um 18:19 schrieb Dmitry Minkovsky :
> 
> When I add a GlobalKTable for topic
> "message-write-service-user-ids-by-email" to my topology, I get this
> warning:
> 
> [2019-01-19 12:18:14,008] WARN
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:421)
> [Consumer
> clientId=message-write-service-55f2ca4d-0efc-4344-90d3-955f9f5a65fd-StreamThread-2-consumer,
> groupId=message-write-service] The following subscribed topics are not
> assigned to any members: [message-write-service-user-ids-by-email]
> 
> Everything seems to be working, but I am concerned about the warning. Am
> I missing something?
> 
> Best,
> Dmitry


User Activity Tracking

2019-01-10 Thread Patrik Kleindl
Hi everyone,

we are planning to add some user activity tracking to an application and I
wanted to ask around for your general experiences and best practices.

Do you use one topic per application or more granular?
Do you write directly from the application to Kafka for tracking purposes?
How to best avoid blocking anything in the application in case of broker
issues?
Is it usually tolerable to lose some tracking information or do you use any
caching and asynchronously produce the messages?

Any learnings are welcome, especially things you would now do differently
if you had to start again :-)

Thanks in advance and best regards

Patrik


Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-11-20 Thread Patrik Kleindl
Hi John and Guozhang
Thanks to both of you.
I will check with our developers if they want to adopt your suggestions.
Using the same ValueTransformer for deduplication on both streams and tables 
might simplify things.
We have eased the operational burden a bit by improving our topic provisioning 
so we can also hold out a bit.
KAFKA-7658 sounds great and made me chuckle because I just asked for this, now 
I see that there were some discussions/emotions regarding this a lot earlier ;-)
Best regards
Patrik

> Am 20.11.2018 um 18:19 schrieb John Roesler :
> 
> Hi again, Patrik,
> 
> You'll probably be interested in this recent Jira:
> https://issues.apache.org/jira/browse/KAFKA-7658
> 
> You have a good point about the overhead of going through an intermediate
> topic... I can see how explicit topic management is an operational burden,
> and you're also right that the changelog topic only gets read on state
> restoration. That was an oversight on my part.
> 
> I think that with KAFKA-7658 and https://github.com/apache/kafka/pull/5779,
> you'll have two good options in the future.
> 
> To solve your problem *right now*, you can circumvent the null filtering by
> wrapping the values of your stream. For example, immediately before the
> reduce, you could mapValues and wrap the values with Optional. Then, your
> reduce function can unwrap the Optional and return null if it's empty. Does
> that make sense?
> 
> This comes with an important caveat, though, which is part of the
> motivation for this roadblock to begin with:
> if your incoming data gets repartitioned in your topology, then the order
> of records for the key is not deterministic. This would break the semantics
> of your reduce-to-latest function, and, indeed, any non-commutative reduce
> function.
> 
> For example, if you have a topic like:
> dummykey1: {realkey: A, value: 4}
> dummykey2: {realkey: A, value: 5}
> 
> and you do a groupBy( select realkey )
> and then reduce( keep latest value)
> 
> Then, if dummykey1 and dummykey2 are in different partitions, the result
> would be either A:4 or A:5, depending on which input partition processed
> first.
> 
> We have discussed several times solutions to resolve this issue, but it's
> quite complex in the details.
> 
> Nevertheless, if you're careful and ensure that you don't have multiple
> threads producing the same key into the input topic, and also that you
> don't have a repartition in the middle, then this should work for you.
> 
> Hope this helps!
> -john
> 
>> On Sun, Nov 18, 2018 at 7:04 PM Guozhang Wang  wrote:
>> 
>> Hi Patrik,
>> 
>> Thanks for explaining your use case to us. While we can still discuss how
>> KStream should interpret null-values in aggregations, one workaround atm:
>> if you deduplication logic can be written as a transformValues operation,
>> you can do the following:
>> 
>> 
>> builder.table("source-topic").transformValues(...
>> Materialized.as("store-name"))
>> 
>> Note that in a recent PR that we are merging, the source KTable from
>> builder.table() would not be materialized if users do not specify a
>> materialized store name, only the value-transformed KTable will be
>> materialized:
>> 
>> https://github.com/apache/kafka/pull/5779
>> 
>> 
>> Would that work for you?
>> 
>> Guozhang
>> 
>> 
>>> On Mon, Oct 29, 2018 at 2:08 AM Patrik Kleindl  wrote:
>>> 
>>> Hi John and Matthias
>>> thanks for the questions, maybe explaining our use case helps a bit:
>>> We are receiving CDC records (row-level insert/update/delete) in one
>> topic
>>> per table. The key is derived from the DB records, the value is null in
>>> case of deletes. Those would be the immutable facts I guess.
>>> These topics are first streamed through a deduplication Transformer to
>> drop
>>> changes on irrelevant fields.
>>> The results are translated to KTables and joined to each other to
>> represent
>>> the same result as the SQLs on the database, but faster. At this stage
>> the
>>> delete/null records matter because if a record gets deleted then we want
>> it
>>> to drop out of the join too. -> Our reduce-approach produced unexpected
>>> results here.
>>> We took the deduplication step separately because in some cases we only
>>> need the the KStream for processing.
>>> If you see a simpler/cleaner approach here I'm open to suggestions, of
>>> course.
>>> 
>>> Regarding the overhead:
>>> 1) Named topics create management/ma

Re: Stream Metrics - Memory Analysis

2018-11-20 Thread Patrik Kleindl
Done.
https://issues.apache.org/jira/browse/KAFKA-7660
br, Patrik

On Mon, 19 Nov 2018 at 02:03, Guozhang Wang  wrote:

> Hello Patrik,
>
> Could you file a JIRA for your findings? Also what Kafka versions are you
> using (could you add that to the ticket as well)?
>
> Could you provide some more elaborations on what you did the JVM analysis,
> so that I can try to re-produce the observations.
>
>
> Guozhang
>
> On Thu, Oct 25, 2018 at 2:50 AM Patrik Kleindl  wrote:
>
> > Hello
> >
> > During the analysis of JVM memory two possible issues were shown which I
> > would like to bring to your attention:
> > 1) Duplicate strings
> > Top findings:
> > string_content="stream-processor-node-metrics" count="534,277"
> > string_content="processor-node-id" count="148,437"
> > string_content="stream-rocksdb-state-metrics" count="41,832"
> > string_content="punctuate-latency-avg" count="29,681"
> >
> > "stream-processor-node-metrics"  seems to be used in Sensors.java as a
> > literal and not interned.
> >
> > 2) The HashMap parentSensors
> > from
> >
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
> > was reported multiple times as suspicious for potentially keeping alive a
> > lot of objects. In our case the reported size was 40-50MB each.
> > I haven't looked too deep in the code but noticed that the class
> > Sensor.java which is used as a key in the HashMap does not implement
> equals
> > or hashCode method. Not sure this is a problem though.
> >
> > Maybe someone can shed some light on this
> >
> > best regards
> >
> > Patrik
> >
>
>
> --
> -- Guozhang
>


Re: Offsets/Lags for global state stores not shown

2018-11-18 Thread Patrik Kleindl
Thanks for the reply.
It would be interesting who else is using IQ with or without GlobalKTables and 
what problems and solutions they have come up with.
Best regards
Patrik

> Am 18.11.2018 um 20:21 schrieb Matthias J. Sax :
> 
> Because each instance needs to consume all data, it's limited by what a
> single instance can consume -- a hard bound is the network. Note,
> network is shared, so don't take the maximum network speed into account.
> Also, it's not the number of unique messaged, but the number of updates
> that is important for this.
> 
> 
>> Just to verify, for this IQ setup (streams app which only builds a single
>> table to be queried) we have tried the alternative approach to use a normal
>> KTable in combination with a unique application ID per application instance.
>> This seemed to work quite well, including faster (parallel) startup etc.
>> Is this approach valid or would you expect some pitfalls?
> 
> 
> I guess, for your use case, this might be ok. There is one difference on
> startup: if there is no local state build up, in the GlobalKTable case,
> before you can start querying, the GlobalKTable will be fully populated
> from the topic. For the KTable case, you can query from the very
> beginning on, while data is put into the table.
> 
> Also, for this approach, if you add other processing, this processing
> would not be parallelized but duplicated.
> 
> 
> -Matthias
> 
> 
> 
>> On 11/7/18 1:32 AM, Patrik Kleindl wrote:
>> Thanks for the response.
>> How "low" is the expected low throughput? We are are using GlobalKTables
>> for IQ on several Topics, but with single-digit million unique messages and
>> usually fewer changes per day.
>> 
>> Just to verify, for this IQ setup (streams app which only builds a single
>> table to be queried) we have tried the alternative approach to use a normal
>> KTable in combination with a unique application ID per application instance.
>> This seemed to work quite well, including faster (parallel) startup etc.
>> Is this approach valid or would you expect some pitfalls?
>> 
>> We have not used this approach more because it doesn't not work for global
>> stores inside a streams application, but it might be beneficial to split
>> that up again.
>> 
>> best regards
>> 
>> Patrik
>> 
>>> On Tue, 6 Nov 2018 at 20:07, Matthias J. Sax  wrote:
>>> 
>>> The topics of global stores are not included by design.
>>> 
>>> The "problem" is, that each instance needs to consume *all*
>>> topic-partitions from and thus topis, we thus they cannot be include
>>> into the consumer group that would assign each partition to exactly one
>>> instance. Hence, an additional consumer is used that uses partition
>>> assignment (instead of subscription) and this consumer does not commit
>>> any offset to Kafka.
>>> 
>>> Note that global stores are bootstrapped before processing begins
>>> though, and are expected to be low throughput topic anyway.
>>> 
>>> 
>>> -Matthias
>>> 
>>>> On 11/6/18 2:03 AM, Patrik Kleindl wrote:
>>>> Hello
>>>> 
>>>> Am I doing something wrong or is it by design that global state stores
>>> and
>>>> their consumers do not show up under the consumer-groups?
>>>> With the consumer group command (and in control center as well) I don't
>>> get
>>>> any output for the group:
>>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
>>>> --describe
>>>> Note: This will not show information about old Zookeeper-based consumers.
>>>> 
>>>> If I query for the state I get a response that members are present:
>>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
>>>> --describe --state
>>>> Note: This will not show information about old Zookeeper-based consumers.
>>>> 
>>>> COORDINATOR (ID) ASSIGNMENT-STRATEGY
>>>> STATE#MEMBERS
>>>> broker:9092 (1) streamStable   2
>>>> 
>>>> This is quite irritating as we cannot see if a global state store has
>>>> caught up with a backlog of messages.
>>>> 
>>>> Code to reproduce:
>>>>builder.globalTable(TOPIC_NAME, Materialized
>>>>.>> byte[]>>as(STORENAME)
>>>>.withKeySerde(Serdes.String())
>>>>

Re: Using GlobalKTable/KeyValueStore for topic cache

2018-11-13 Thread Patrik Kleindl
Hi Chris

We are using them like you described.
Performance is very good compared to the database used before.
Beware that until https://issues.apache.org/jira/browse/KAFKA-7380
is done the startup will be blocked until all global stores are restored 
(sequentially).
This can take a little for larger topic and/or multiple global stores.

We are blocking access until they are available although this is not ideal in 
terms of timeout tuning.

Any ideas are welcome.

Best regards

Patrik

> Am 14.11.2018 um 00:17 schrieb Chris Toomey :
> 
> We're considering using GlobalKTables / KeyValueStores for locally caching
> topic content in services. The topics would be compacted such that only the
> latest key/value pair would exist for a given key.
> 
> One question that's come up is how to determine, when bootstrapping the
> app, when the cache has been populated with the latest content from the
> topic (so we start with a "warm" cache). ReadOnlyKeyValueStore has
> an approximateNumEntries() method that we could use to see how much we've
> got, but trying to figure out how much there is in the topic looks much
> more difficult -- the only way I can see via the APIs / code is to use an
> AdminClient to get the topic partitions and then the KafkaConsumer to get
> the end offsets for those.
> 
> Does anyone have experience doing this kind of caching? How did you handle
> the bootstrapping issue?
> 
> Any thoughts on easier or better ways to determine when the cache is warm?
> 
> thx,
> Chris


Re: Offsets/Lags for global state stores not shown

2018-11-07 Thread Patrik Kleindl
Thanks for the response.
How "low" is the expected low throughput? We are are using GlobalKTables
for IQ on several Topics, but with single-digit million unique messages and
usually fewer changes per day.

Just to verify, for this IQ setup (streams app which only builds a single
table to be queried) we have tried the alternative approach to use a normal
KTable in combination with a unique application ID per application instance.
This seemed to work quite well, including faster (parallel) startup etc.
Is this approach valid or would you expect some pitfalls?

We have not used this approach more because it doesn't not work for global
stores inside a streams application, but it might be beneficial to split
that up again.

best regards

Patrik

On Tue, 6 Nov 2018 at 20:07, Matthias J. Sax  wrote:

> The topics of global stores are not included by design.
>
> The "problem" is, that each instance needs to consume *all*
> topic-partitions from and thus topis, we thus they cannot be include
> into the consumer group that would assign each partition to exactly one
> instance. Hence, an additional consumer is used that uses partition
> assignment (instead of subscription) and this consumer does not commit
> any offset to Kafka.
>
> Note that global stores are bootstrapped before processing begins
> though, and are expected to be low throughput topic anyway.
>
>
> -Matthias
>
> On 11/6/18 2:03 AM, Patrik Kleindl wrote:
> > Hello
> >
> > Am I doing something wrong or is it by design that global state stores
> and
> > their consumers do not show up under the consumer-groups?
> > With the consumer group command (and in control center as well) I don't
> get
> > any output for the group:
> > ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
> > --describe
> > Note: This will not show information about old Zookeeper-based consumers.
> >
> > If I query for the state I get a response that members are present:
> > ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
> > --describe --state
> > Note: This will not show information about old Zookeeper-based consumers.
> >
> > COORDINATOR (ID) ASSIGNMENT-STRATEGY
> > STATE#MEMBERS
> > broker:9092 (1) streamStable   2
> >
> > This is quite irritating as we cannot see if a global state store has
> > caught up with a backlog of messages.
> >
> > Code to reproduce:
> > builder.globalTable(TOPIC_NAME, Materialized
> > . byte[]>>as(STORENAME)
> > .withKeySerde(Serdes.String())
> > .withValueSerde(Serdes.String()));
> >
> > Nothing fancy.
> >
> > Logs:
> > 2018-11-05 21:25:56 INFO  AbstractCoordinator:442 - (Re-)joining group
> > 2018-11-05 21:25:56 INFO  StreamPartitionAssignor:481 - Assigned tasks to
> > clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
> > standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
> > prevAssignedTasks: ([]) capacity: 1]}.
> > 2018-11-05 21:25:56 WARN  ConsumerCoordinator:376 - The following
> > subscribed topics are not assigned to any members: [storetopic]
> > 2018-11-05 21:25:56 INFO  AbstractCoordinator:409 - Successfully joined
> > group with generation 3
> > 2018-11-05 21:25:56 INFO  ConsumerCoordinator:256 - Setting newly
> assigned
> > partitions []
> >
> > The store works after this, but it is not shown.
> >
> > Any input is appreciated
> >
> > best regards
> >
> > Patrik
> >
> > PS: The customer will forward this to the Confluent support too, but I'm
> > asking here for public visibility
> >
>
>


Offsets/Lags for global state stores not shown

2018-11-06 Thread Patrik Kleindl
Hello

Am I doing something wrong or is it by design that global state stores and
their consumers do not show up under the consumer-groups?
With the consumer group command (and in control center as well) I don't get
any output for the group:
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe
Note: This will not show information about old Zookeeper-based consumers.

If I query for the state I get a response that members are present:
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe --state
Note: This will not show information about old Zookeeper-based consumers.

COORDINATOR (ID) ASSIGNMENT-STRATEGY
STATE#MEMBERS
broker:9092 (1) streamStable   2

This is quite irritating as we cannot see if a global state store has
caught up with a backlog of messages.

Code to reproduce:
builder.globalTable(TOPIC_NAME, Materialized
.>as(STORENAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));

Nothing fancy.

Logs:
2018-11-05 21:25:56 INFO  AbstractCoordinator:442 - (Re-)joining group
2018-11-05 21:25:56 INFO  StreamPartitionAssignor:481 - Assigned tasks to
clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1]}.
2018-11-05 21:25:56 WARN  ConsumerCoordinator:376 - The following
subscribed topics are not assigned to any members: [storetopic]
2018-11-05 21:25:56 INFO  AbstractCoordinator:409 - Successfully joined
group with generation 3
2018-11-05 21:25:56 INFO  ConsumerCoordinator:256 - Setting newly assigned
partitions []

The store works after this, but it is not shown.

Any input is appreciated

best regards

Patrik

PS: The customer will forward this to the Confluent support too, but I'm
asking here for public visibility


Re: Deduplicating a topic in the face of producer crashes over a time window?

2018-11-01 Thread Patrik Kleindl
Hi Andrew

Did you take a look at
https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
?
We are using this for a case like you described.
Growth should be limited with this approach.

Best regards 
Patrik

> Am 02.11.2018 um 01:33 schrieb Andrew Wilcox :
> 
> Suppose I have a producer which is ingesting a data stream with unique keys
> from an external service and sending it to a Kafka topic.  In my producer I
> can set enable.idempotence and get exactly-once delivery in the presence of
> broker crashes.  However my producer might crash after it delivers a batch
> of messages to Kafka but before it records that the batch was delivered.
> After restarting the crashed producer it would re-deliver the same batch,
> resulting in duplicate messages in the topic.
> 
> With a streams transformer I can deduplicate the topic by using a state
> store to record previously seen keys and then only creating an output
> record if the key hasn't been seen before.  However without a mechanism to
> remove old keys the state store will grow without bound.
> 
> Say I only want to deduplicate over a time period such as one day.  (I'm
> confident that I'll be able to restart a crashed producer sooner).  Thus
> I'd like keys older than a day to expire out of the state store, so the
> store only needs to keep track of keys seen in the last day or so.
> 
> Is there a way to do this with Kafka streams?  Or is there another
> recommended mechanism to keep messages with unique keys unduplicated in the
> presence of producer crashes?
> 
> Thanks!
> 
> Andrew


Re: Problem with kafka-streams aggregate windowedBy

2018-10-29 Thread Patrik Kleindl
Hi
How long does your application run? More than the 60 seconds you set for commit 
interval?
Have a look at 
https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
 
and check if your offsets are really comitted
Best regards
Patrik

> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov :
> 
> Hi
> No, my application id doesn't change
> 
> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl :
> 
>> Hi
>> Does your applicationId change?
>> Best regards
>> Patrik
>> 
>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov :
>>> 
>>> Hi everyone! I use kafka-streams, and i have a problem when i use
>>> windowedBy. Everything works well until I restart the application. After
>>> restarting my aggregation starts from beginning.
>>> Code bellow:
>>>> 
>>>>   StreamsBuilder builder = new StreamsBuilder()
>>>>   KStream stream = builder.stream(topic,
>> Consumed.with(Serdes.String(), Serdes.String()))
>>>> 
>>>>   KTable table =
>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>>>   .aggregate(
>>>>   { new AggregatorModel() },
>>>>   { key, value, aggregate ->
>>>>   return aggregate.add(value)
>>>>   }
>>>>   )
>>>>   .toStream()
>>>>   .map({ k, v ->
>>>>   new KeyValue<>(k.window().end(), v)
>>>>   })
>>>>   .to('output')
>>>> 
>>>>   def config = new Properties()
>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>> TimeUnit.SECONDS.toMillis(60))
>>>> 
>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>>>   kafkaStreams.start()
>>>> 
>>>> 
>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
>>> 'latest' and 'earliest' but it didn't help.
>>> Can you help me understand what I'm doing wrong?
>>> Thank you.
>> 


Re: Problem with kafka-streams aggregate windowedBy

2018-10-29 Thread Patrik Kleindl
Hi
Does your applicationId change?
Best regards 
Patrik

> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov :
> 
> Hi everyone! I use kafka-streams, and i have a problem when i use
> windowedBy. Everything works well until I restart the application. After
> restarting my aggregation starts from beginning.
> Code bellow:
>> 
>>StreamsBuilder builder = new StreamsBuilder()
>>KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), 
>> Serdes.String()))
>> 
>>KTable table = 
>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>.aggregate(
>>{ new AggregatorModel() },
>>{ key, value, aggregate ->
>>return aggregate.add(value)
>>}
>>)
>>.toStream()
>>.map({ k, v ->
>>new KeyValue<>(k.window().end(), v)
>>})
>>.to('output')
>> 
>>def config = new Properties()
>>config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
>> TimeUnit.SECONDS.toMillis(60))
>> 
>>KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>kafkaStreams.start()
>> 
>> 
> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
> 'latest' and 'earliest' but it didn't help.
> Can you help me understand what I'm doing wrong?
> Thank you.


Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-29 Thread Patrik Kleindl
Hi John and Matthias
thanks for the questions, maybe explaining our use case helps a bit:
We are receiving CDC records (row-level insert/update/delete) in one topic
per table. The key is derived from the DB records, the value is null in
case of deletes. Those would be the immutable facts I guess.
These topics are first streamed through a deduplication Transformer to drop
changes on irrelevant fields.
The results are translated to KTables and joined to each other to represent
the same result as the SQLs on the database, but faster. At this stage the
delete/null records matter because if a record gets deleted then we want it
to drop out of the join too. -> Our reduce-approach produced unexpected
results here.
We took the deduplication step separately because in some cases we only
need the the KStream for processing.
If you see a simpler/cleaner approach here I'm open to suggestions, of
course.

Regarding the overhead:
1) Named topics create management/maintenance overhead because they have to
be created/treated separately (auto-create is not an option) and be
considered in future changes, topology changes/resets and so on. The
internal topic removes most of those issues.
2) One of our developers came up with the question if the traffic to/from
the broker was actually the same in both scenarios, we expect that the same
is written to the broker for the named topic as well as the reduce-case,
but if the KTable is maintained inside a streams topology, does it have to
read back everything it sends to the broker or can it keep the table
internally? I hope it is understandable what I mean, otherwise I can try
the explain it more clearly.

best regards

Patrik


On Sat, 27 Oct 2018 at 23:50, John Roesler  wrote:

> Hi again Patrik,
>
> Actually, this is a good question... Can you share some context about why
> you need to convert a stream to a table (including nulls as retractions)?
>
> Thanks,
> -John
>
> On Fri, Oct 26, 2018 at 5:36 PM Matthias J. Sax 
> wrote:
>
> > I don't know your overall application setup. However, a KStream
> > semantically models immutable facts and there is not update semantic.
> > Thus, it seems semantically questionable, to allow changing the
> > semantics from facts to updates (the other way is easier IMHO, and thus
> > supported via KTable#toStream()).
> >
> > Does this make sense?
> >
> > Having said this: you _can_ write a KStream into a topic an read it back
> > as KTable. But it's semantically questionable to do so, IMHO. Maybe it
> > makes sense for your specific application, but in general I don't think
> > it does make sense.
> >
> >
> > -Matthias
> >
> > On 10/26/18 9:30 AM, John Roesler wrote:
> > > Hi Patrik,
> > >
> > > Just to drop one observation in... Streaming to a topic and then
> > consuming
> > > it as a table does create overhead, but so does reducing a stream to a
> > > table, and I think it's actually the same in either case.
> > >
> > > They both require a store to collect the table state, and in both
> cases,
> > > the stores need to have a changelog topic. For the "reduce" version,
> it's
> > > an internal changelog topic, and for the "topic-to-table" version, the
> > > store can use the intermediate topic as its changelog.
> > >
> > > This doesn't address your ergonomic concern, but it seemed worth
> pointing
> > > out that (as far as I can tell), there doesn't seem to be a difference
> in
> > > overhead.
> > >
> > > Hope this helps!
> > > -John
> > >
> > > On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl 
> > wrote:
> > >
> > >> Hello Matthias,
> > >> thank you for the explanation.
> > >> Streaming back to a topic and consuming this as a KTable does respect
> > the
> > >> null values as deletes, correct? But at the price of some overhead.
> > >> Is there any (historical, technical or emotional;-)) reason that no
> > simple
> > >> one-step stream-to-table operation exists?
> > >> Best regards
> > >> Patrik
> > >>
> > >>> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <
> matth...@confluent.io
> > >:
> > >>>
> > >>> Patrik,
> > >>>
> > >>> `null` values in a KStream don't have delete semantics (it's not a
> > >>> changelog stream). That's why we drop them in the KStream#reduce
> > >>> implemenation.
> > >>>
> > >>> If you want to explicitly remove results for a key from the result
> &

Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-26 Thread Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does respect the null 
values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no simple 
one-step stream-to-table operation exists?
Best regards
Patrik

> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax :
> 
> Patrik,
> 
> `null` values in a KStream don't have delete semantics (it's not a
> changelog stream). That's why we drop them in the KStream#reduce
> implemenation.
> 
> If you want to explicitly remove results for a key from the result
> KTable, your `Reducer#apply()` implementation must return `null` -- the
> result of #apply() has changelog/KTable semantics and `null` is
> interpreted as delete for this case.
> 
> If you want to use `null` from your KStream to trigger reduce() to
> delete, you will need to use a surrogate value for this, ie, do a
> mapValues() before the groupByKey() call, an replace `null` values with
> the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
> to return `null` for this case.
> 
> Hope this helps.
> 
> -Matthias
> 
>> On 10/25/18 10:36 AM, Patrik Kleindl wrote:
>> Hello
>> 
>> Recently we noticed a lot of warning messages in the logs which pointed to
>> this method (we are running 2.0):
>> 
>> KStreamReduce
>> public void process(final K key, final V value) {
>>// If the key or value is null we don't need to proceed
>>if (key == null || value == null) {
>>LOG.warn(
>>"Skipping record due to null key or value. key=[{}]
>> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
>>key, value, context().topic(), context().partition(),
>> context().offset()
>>);
>>metrics.skippedRecordsSensor().record();
>>return;
>>}
>> 
>> This was triggered for every record from a stream with an existing key but
>> a null value which we put through groupBy/reduce to get a KTable.
>> My assumption was that this was the correct way inside a streams
>> application to get a KTable but this prevents deletion of records from
>> working.
>> 
>> Our alternativ is to send the stream back to a named topic and build a new
>> table from it, but this is rather cumbersome and requires a separate topic
>> which also can't be cleaned up by the streams reset tool.
>> 
>> Did I miss anything relevant here?
>> Would it be possible to create a separate method for KStream to achieve
>> this directly?
>> 
>> best regards
>> 
>> Patrik
>> 
> 


Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-25 Thread Patrik Kleindl
Hello

Recently we noticed a lot of warning messages in the logs which pointed to
this method (we are running 2.0):

KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}

This was triggered for every record from a stream with an existing key but
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.

Our alternativ is to send the stream back to a named topic and build a new
table from it, but this is rather cumbersome and requires a separate topic
which also can't be cleaned up by the streams reset tool.

Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to achieve
this directly?

best regards

Patrik


Stream Metrics - Memory Analysis

2018-10-25 Thread Patrik Kleindl
Hello

During the analysis of JVM memory two possible issues were shown which I
would like to bring to your attention:
1) Duplicate strings
Top findings:
string_content="stream-processor-node-metrics" count="534,277"
string_content="processor-node-id" count="148,437"
string_content="stream-rocksdb-state-metrics" count="41,832"
string_content="punctuate-latency-avg" count="29,681"

"stream-processor-node-metrics"  seems to be used in Sensors.java as a
literal and not interned.

2) The HashMap parentSensors
from 
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
was reported multiple times as suspicious for potentially keeping alive a
lot of objects. In our case the reported size was 40-50MB each.
I haven't looked too deep in the code but noticed that the class
Sensor.java which is used as a key in the HashMap does not implement equals
or hashCode method. Not sure this is a problem though.

Maybe someone can shed some light on this

best regards

Patrik


RocksDB not closed on error during CachingKeyValueStore.flush?

2018-10-23 Thread Patrik Kleindl
Hello

Can someone please verify if my assumption is correct?
In CachingKeyValueStore, if an exception happens during flush() the store
will not be closed properly.

@Override
public void flush() {
lock.writeLock().lock();
try {
cache.flush(cacheName);
underlying.flush();
} finally {
lock.writeLock().unlock();
}
}

@Override
public void close() {
flush();
underlying.close();
cache.close(cacheName);

An exception leading to this, notice that another store is already closed
and therefore not available:
2018-10-04 12:18:44,961 ERROR
[org.apache.kafka.streams.processor.internals.ProcessorStateManager]
(...-StreamThread-8) - task [8_11] Failed to close state store
...-STATE-STORE-38: :
org.apache.kafka.streams.errors.InvalidStateStoreException: Store
KSTREAM-REDUCE-STATE-STORE-25 is currently closed.
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
at
org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
at
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

If the store is not closed we have witnessed that the lock is RocksDB is
not removed properly which can lead to

2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   Caused
by: org.rocksdb.RocksDBException: While lock file:
...-STATE-STORE-38/LOCK: No locks available
2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   at
org.rocksdb.RocksDB.open(Native Method)

best regards

Patrik


Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-04 Thread Patrik Kleindl
Saw it, thank you.
Best regards
Patrik

> Am 04.10.2018 um 17:11 schrieb Matthias J. Sax :
> 
> Thanks for the details.
> 
> Seems like a fair assumption. I created a jira to track it:
> https://issues.apache.org/jira/browse/KAFKA-7480
> 
> For now, there is not much you can do, because Streams hard codes to set
> the policy to "none". Thus, a manual restart (that is gladly working as
> you confirmed) it currently the way to go.
> 
> Thanks for reporting this issue.
> 
> 
> -Matthias
> 
>> On 10/4/18 3:23 AM, Patrik Kleindl wrote:
>> Hello Matthias
>> Thanks for looking into this.
>> A restart has worked, I can confirm that.
>> Before this problem happened we had some cluster issues which are still
>> being looked into, there were some leader changes and some offset commit
>> failures.
>> The consumer should not have lagged that much behind, but I can only check
>> that at the next occurrence.
>> 
>> Does the user have any other solution available than to restart?
>> I understand the intention to "notify" the user of a potential problem, but
>> if nothing can be changed about the data loss then a warning message and
>> automatic recovery should not make things worse.
>> This would make sense as an improvement, as I understand this is not a bug
>> the case is closed for me at the moment.
>> 
>> Thanks again and best regards
>> Patrik
>> 
>>> On Thu, 4 Oct 2018 at 02:58, Matthias J. Sax  wrote:
>>> 
>>> I double checked the code and discussed with a colleague.
>>> 
>>> There are two places when we call `globalConsumer.poll()`
>>> 
>>> 1. On startup, when we need to bootstrap the store. In this case, we
>>> catch the exception and handle it.
>>> 2. During regular processing. In this case, we don't catch the exception.
>>> 
>>> The reasoning is the following: For case (1) the exception should only
>>> happen if you start a new application of if an application was offline
>>> for a long time. This is fine and we just make sure to bootstrap
>>> correctly. For case (2) the consumer is at the end of the log and thus,
>>> an InvalidOffsetException should never occur but indicate an issue the
>>> user should be notified about.
>>> 
>>> Does this reasoning make sense?
>>> 
>>> Question: if you restart your application, does it fail again? Or does
>>> it resume processing?
>>> 
>>> It would be good to understand the root cause. It seems, you
>>> globalConsumer is lagging behind? Can you verify this? If yes, it seems
>>> to make sense to stop processing to inform the user about this issue.
>>> Would you rather prefer the application to just move on implying silent
>>> data loss??
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>>> On 10/3/18 12:20 AM, Patrik Kleindl wrote:
>>>> Hello Matthias
>>>> Thank you for the explanation.
>>>> 
>>>> Version used is 2.0.0-cp1
>>>> 
>>>> The stacktrace:
>>>> 2018-10-02 10:51:52,575 ERROR
>>>> 
>>> [org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer]
>>>> (...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) -
>>>> [short-component-name:; transaction-id:; user-id:; creation-time:]
>>>> global-stream-thread
>>>> [...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating
>>>> global state failed. You can restart KafkaStreams to recover from this
>>>> error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>>>> Offsets out of range with no configured reset policy for partitions:
>>>> {...=51247974}
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
>>>> at
>&g

Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-04 Thread Patrik Kleindl
Hello Matthias
Thanks for looking into this.
A restart has worked, I can confirm that.
Before this problem happened we had some cluster issues which are still
being looked into, there were some leader changes and some offset commit
failures.
The consumer should not have lagged that much behind, but I can only check
that at the next occurrence.

Does the user have any other solution available than to restart?
I understand the intention to "notify" the user of a potential problem, but
if nothing can be changed about the data loss then a warning message and
automatic recovery should not make things worse.
This would make sense as an improvement, as I understand this is not a bug
the case is closed for me at the moment.

Thanks again and best regards
Patrik

On Thu, 4 Oct 2018 at 02:58, Matthias J. Sax  wrote:

> I double checked the code and discussed with a colleague.
>
> There are two places when we call `globalConsumer.poll()`
>
> 1. On startup, when we need to bootstrap the store. In this case, we
> catch the exception and handle it.
> 2. During regular processing. In this case, we don't catch the exception.
>
> The reasoning is the following: For case (1) the exception should only
> happen if you start a new application of if an application was offline
> for a long time. This is fine and we just make sure to bootstrap
> correctly. For case (2) the consumer is at the end of the log and thus,
> an InvalidOffsetException should never occur but indicate an issue the
> user should be notified about.
>
> Does this reasoning make sense?
>
> Question: if you restart your application, does it fail again? Or does
> it resume processing?
>
> It would be good to understand the root cause. It seems, you
> globalConsumer is lagging behind? Can you verify this? If yes, it seems
> to make sense to stop processing to inform the user about this issue.
> Would you rather prefer the application to just move on implying silent
> data loss??
>
>
> -Matthias
>
>
> On 10/3/18 12:20 AM, Patrik Kleindl wrote:
> > Hello Matthias
> > Thank you for the explanation.
> >
> > Version used is 2.0.0-cp1
> >
> > The stacktrace:
> > 2018-10-02 10:51:52,575 ERROR
> >
> [org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer]
> > (...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) -
> > [short-component-name:; transaction-id:; user-id:; creation-time:]
> > global-stream-thread
> > [...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating
> > global state failed. You can restart KafkaStreams to recover from this
> > error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> > Offsets out of range with no configured reset policy for partitions:
> > {...=51247974}
> > at
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
> > at
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
> > at
> >
> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
> > at
> >
> org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)
> >
> > Fetcher.parseCompletedFetch:
> >
> > else if (error == Errors.OFFSET_OUT_OF_RANGE) {
> > if (fetchOffset != subscriptions.position(tp)) {
> > log.debug("Discarding stale fetch response for
> > partition {} since the fetched offset {} " +
> > "does not match the current offset {}", tp,
> > fetchOffset, subscriptions.position(tp));
> > } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
> > log.info("Fetch offset {} is out of range for
> partition
> > {}, resetting offset", fetchOffset, tp);
> > subscriptions.requestOffsetReset(tp);
> > } else {
> > throw new
> > OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
> > }
> >
> > So this means that for global/restore the exception will always be thrown
> > without some special handling?
> >
> > best regards
> >
> > Patrik
> >
> > On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax 

Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-03 Thread Patrik Kleindl
Hello Matthias
Thank you for the explanation.

Version used is 2.0.0-cp1

The stacktrace:
2018-10-02 10:51:52,575 ERROR
[org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer]
(...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) -
[short-component-name:; transaction-id:; user-id:; creation-time:]
global-stream-thread
[...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating
global state failed. You can restart KafkaStreams to recover from this
error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset policy for partitions:
{...=51247974}
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at
org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
at
org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)

Fetcher.parseCompletedFetch:

else if (error == Errors.OFFSET_OUT_OF_RANGE) {
if (fetchOffset != subscriptions.position(tp)) {
log.debug("Discarding stale fetch response for
partition {} since the fetched offset {} " +
"does not match the current offset {}", tp,
fetchOffset, subscriptions.position(tp));
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset {} is out of range for partition
{}, resetting offset", fetchOffset, tp);
subscriptions.requestOffsetReset(tp);
} else {
throw new
OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
}

So this means that for global/restore the exception will always be thrown
without some special handling?

best regards

Patrik

On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax  wrote:

> It is by design to set the reset policy to "none"
> (https://issues.apache.org/jira/browse/KAFKA-6121), and not allowed by
> design to overwrite this (there might be a workaround for you though).
> However, Streams should not die but catch the exception and recover from
> it automatically.
>
> What version do you use? Can you share the full stack trace to see why
> Streams failed to recover from this exception?
>
>
> -Matthias
>
> On 10/2/18 4:54 AM, Patrik Kleindl wrote:
> > Hi
> >
> > We had several incidents where a streams application crashed while
> > maintaining a global state store.
> > Updating global state failed. You can restart KafkaStreams to recover
> from
> > this error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> > Offsets out of range with no configured reset policy for partitions: ...
> >
> > As we never set this to none I checked the code and found that
> > StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs both
> > set this explicitely:
> > baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
> >
> > The logs confirms this:
> > 2018-10-02 11:07:06,057 INFO
> [org.apache.kafka.common.utils.AppInfoParser]
> > (ServerService Thread Pool -- 70) - [short-component-name:;
> > transaction-id:; user-id:; creation-time:]  Kafka version : 2.0.0-cp1
> > 2018-10-02 11:07:06,057 INFO
> [org.apache.kafka.common.utils.AppInfoParser]
> > (ServerService Thread Pool -- 70) - [short-component-name:;
> > transaction-id:; user-id:; creation-time:]  Kafka commitId :
> > a8c648ff08b9235d
> > 2018-10-02 11:07:06,104 INFO
> > [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread
> > Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
> > creation-time:]  ConsumerConfig values:
> > auto.commit.interval.ms = 5000
> > auto.offset.reset = none
> > bootstrap.servers = [...]
> > check.crcs = true
> > client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer
> >
> > ...
> >
> > 2018-10-02 11:07:06,418 INFO
> > [org.apache.kafka.streams.processor.internals.StreamThread]
> (ServerService
> > Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
> > creation-time:]  stream-thread
> > [...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating
> restore
> > consumer client
> > 2018-10-02 11:07:06,419 INFO
> > [org.apache.kafka.clients.consumer.

Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-02 Thread Patrik Kleindl
Hi

We had several incidents where a streams application crashed while
maintaining a global state store.
Updating global state failed. You can restart KafkaStreams to recover from
this error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset policy for partitions: ...

As we never set this to none I checked the code and found that
StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs both
set this explicitely:
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

The logs confirms this:
2018-10-02 11:07:06,057 INFO  [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 70) - [short-component-name:;
transaction-id:; user-id:; creation-time:]  Kafka version : 2.0.0-cp1
2018-10-02 11:07:06,057 INFO  [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 70) - [short-component-name:;
transaction-id:; user-id:; creation-time:]  Kafka commitId :
a8c648ff08b9235d
2018-10-02 11:07:06,104 INFO
[org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread
Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
creation-time:]  ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [...]
check.crcs = true
client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer

...

2018-10-02 11:07:06,418 INFO
[org.apache.kafka.streams.processor.internals.StreamThread] (ServerService
Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
creation-time:]  stream-thread
[...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating restore
consumer client
2018-10-02 11:07:06,419 INFO
[org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread
Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
creation-time:]  ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [...]
check.crcs = true
client.id =
...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1-restore-consumer

Is this intentional and if yes, why can this not use the default policy and
recover?

best regards

Patrik


Managing/Versioning Topic Configurations for CI/CD

2018-09-27 Thread Patrik Kleindl
Hello everyone,

we are currently trying to improve the management of our topic
configurations.
At the moment we are managing the configurations on the producer side and
checking/creating/changing topics on each application (instance) startup.

This has worked fine for many cases, but does not really protect against
duplicate/conflicting configuration and also requires handling of
concurrency issues.
On the plus side, this makes it easy for different teams to handle the
configurations for "their" topics.

A more centralized approach would fix some of those problems but would
create other dependencies on the deployment side.

I'm very interested in any feedback you can provide how you manage the
topic configuration and changes in your systems, especially regarding
versioning (of the configuration, not content).

best regards

Patrik


Re: GlobalKTable/KTable initialization differences

2018-09-06 Thread Patrik Kleindl
Thank you, looking forward to testing it :-)
Best regards
Patrik

> Am 06.09.2018 um 02:24 schrieb Matthias J. Sax :
> 
> I create https://issues.apache.org/jira/browse/KAFKA-7380 to track this.
> 
> -Matthias
> 
>> On 8/27/18 12:07 PM, Guozhang Wang wrote:
>> Hello Patrik,
>> 
>> Thanks for the email and this is indeed a good question! :)
>> 
>> There are some historic reasons that we did the global state restoration in
>> a different way, because the restoration logic refactoring comes after
>> global KTable and global stores were introduced. But long story short, I
>> agree with you that we can indeed make global store restoration logic
>> consistent with the local state stores.
>> The tricky part is though, that global stores are shared among all
>> streaming threads within an instance (i.e. we do not keep one global task
>> per thread, but one global task per instance). So what we need to guarantee
>> is that: during a rebalance that any of the StreamThread has participated
>> (note that not all threads may participate in a single rebalance), after
>> the thread has transited to PARTITION_ASSIGNED state, we should make sure
>> the global state store is caught up to its changelog's log end offset
>> before making that threads to a RUNNING state.
>> 
>> Some initial thoughts about how to do it:
>> 
>> 1. Whenever a thread transited to PARTITION_ASSIGNED state, check if the
>> global store's restoration has completed; if not, halt on further
>> transiting to RUNNING, and tries to start restoring global stores.
>> 2. Only start a task after its corresponding stores changelog has been
>> restored, IN ADDITION, the global stores being restored as well.
>> 
>> If you like, please feel free to create a JIRA requesting this improvement
>> so someone can work on it someday.
>> 
>> Guozhang
>> 
>> 
>> 
>> 
>>> On Sat, Aug 25, 2018 at 10:44 AM, Patrik Kleindl  wrote:
>>> 
>>> Hello
>>> 
>>> We are currently using GlobalKTables for interactive queries as well as for
>>> lookups inside stream applications but have come across some
>>> limitations/problems.
>>> The main problem was that our deployments including application start took
>>> longer with every new global state store we added which cause some
>>> operational issues because a timeout of 10 minutes was reached.
>>> As this was not expected from the documentation I investigated a little:
>>> According to the documentation the call the KafkaStreams start method
>>> should always return immediately and not block the calling thread.
>>> 
>>> *Start the KafkaStreams instance by starting all its threads. This function
>>> is expected to be called only once during the life cycle of the
>>> client.**Because
>>> threads are started in the background, this method does not block.*
>>> 
>>> But if one (or more) GlobalStateStores are initialized then this call will
>>> take a considerable amount of time and block.
>>> This happens because the GlobalStreamThread only changes to running after
>>> initialize() is done which does all the state restore etc in loops.
>>> An inquiry to the support yielded the answer that this was working as
>>> designed and the documentation will be updated.
>>> 
>>> While we have worked around the issue by asynchronously calling the start
>>> method the question remains why this is intended?
>>> 
>>> If I understand https://issues.apache.org/jira/browse/KAFKA-6205 correctly
>>> the topology should not be initialized before the state stores are
>>> restored, so why is it necessary to handle global state stores differently
>>> than local ones in this aspect?
>>> Additionally, for global state stores all stores and partitions are
>>> initialized sequentially while local state stores are always handled in
>>> parallel (per store and for all partitions)
>>> 
>>> Any help is welcome, maybe it would be worth a KIP to improve this
>>> situation.
>>> A code sample as well as logs for both cases can be found below.
>>> 
>>> best regards
>>> 
>>> Patrik
>>> 
>>> PS: Sample code to reproduce, just switch between GlobalKTable and KTable,
>>> the topic used should have 10+ partitions and contain a few 100K records to
>>> show some meaningful results:
>>> 
>>> import org.apache.kafka.common.serialization.Serdes;
>>> import org.apache.k

Re: resetting consumer group offset to earliest and to-latest not working

2018-09-01 Thread Patrik Kleindl
Hello
Did you add --execute to the command?
Which command did you use?
Best regards
Patrik

> Am 01.09.2018 um 14:54 schrieb Joseph M'BIMBI-BENE :
> 
> Hello everyone,
> 
> Hopefully this is the appropriate mailing list for my message. 
> When i am trying to reset the offset of some consumer group, i get some echo 
> telling me that the offset has indeed been reset to earliest or latest, but 
> checking right after, the offset is still at its previous position, and 
> restarting the consumers on the group, they indeed continue to consume 
> message even after issuing the command to reset the offsets of the partitions 
> to the latest offset.
> 
> I am Using Kafka 1.1.1 for scala 2.11, and i will put a screenshot of the 
> terminal if that could help you help me.
> 
> Thank you in advance. Best regards


GlobalKTable/KTable initialization differences

2018-08-25 Thread Patrik Kleindl
Hello

We are currently using GlobalKTables for interactive queries as well as for
lookups inside stream applications but have come across some
limitations/problems.
The main problem was that our deployments including application start took
longer with every new global state store we added which cause some
operational issues because a timeout of 10 minutes was reached.
As this was not expected from the documentation I investigated a little:
According to the documentation the call the KafkaStreams start method
should always return immediately and not block the calling thread.

*Start the KafkaStreams instance by starting all its threads. This function
is expected to be called only once during the life cycle of the
client.**Because
threads are started in the background, this method does not block.*

But if one (or more) GlobalStateStores are initialized then this call will
take a considerable amount of time and block.
This happens because the GlobalStreamThread only changes to running after
initialize() is done which does all the state restore etc in loops.
An inquiry to the support yielded the answer that this was working as
designed and the documentation will be updated.

While we have worked around the issue by asynchronously calling the start
method the question remains why this is intended?

If I understand https://issues.apache.org/jira/browse/KAFKA-6205 correctly
the topology should not be initialized before the state stores are
restored, so why is it necessary to handle global state stores differently
than local ones in this aspect?
Additionally, for global state stores all stores and partitions are
initialized sequentially while local state stores are always handled in
parallel (per store and for all partitions)

Any help is welcome, maybe it would be worth a KIP to improve this
situation.
A code sample as well as logs for both cases can be found below.

best regards

Patrik

PS: Sample code to reproduce, just switch between GlobalKTable and KTable,
the topic used should have 10+ partitions and contain a few 100K records to
show some meaningful results:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;

import java.util.Date;
import java.util.Properties;

public class TestTables {

public static void main(String[] args) {
final String TOPIC_NAME = "testtables";

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"testtables");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"broker0:9092");

StreamsBuilder builder = new StreamsBuilder();

GlobalKTable testtable =
builder.globalTable(TOPIC_NAME, Materialized.with(Serdes.String(),
Serdes.String()));
//KTable testtable = builder.table(TOPIC_NAME,
Materialized.with(Serdes.String(), Serdes.String()));

final KafkaStreams streams = new KafkaStreams(builder.build(),
streamsConfiguration);

streams.cleanUp();
streams.setStateListener((state, state1) -> {
if (state == KafkaStreams.State.RUNNING && state1 ==
KafkaStreams.State.REBALANCING)
System.out.println("Running " + new Date());
});
System.out.println("Starting " + new Date());
streams.start();
System.out.println("Started " + new Date());

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
streams.close();
} catch (Exception e) {
// ignored
}
}));

}
}

Log for KTable:
Starting Fri Aug 24 20:17:19 CEST 2018

2018-08-24 20:17:19 DEBUG KafkaStreams:759 - stream-client
[testtables-8ca25259-6bb2-4bc1-9293-fc05e1b28ce7] Starting Streams client

2018-08-24 20:17:19 INFO  StreamThread:713 - stream-thread
[testtables-8ca25259-6bb2-4bc1-9293-fc05e1b28ce7-StreamThread-1] Starting

2018-08-24 20:17:19 INFO  StreamThread:200 - stream-thread
[testtables-8ca25259-6bb2-4bc1-9293-fc05e1b28ce7-StreamThread-1] State
transition from CREATED to RUNNING

2018-08-24 20:17:19 INFO  KafkaStreams:782 - stream-client
[testtables-8ca25259-6bb2-4bc1-9293-fc05e1b28ce7] Started Streams client

Started Fri Aug 24 20:17:19 CEST 2018

2018-08-24 20:17:19 INFO  Metadata:265 - Cluster ID: 6YtMshjsT_WQNwRGhTyc4A

2018-08-24 20:17:19 INFO  AbstractCoordinator:605 - [Consumer
clientId=testtables-8ca25259-6bb2-4bc1-9293-fc05e1b28ce7-StreamThread-1-consumer,
groupId=testtables] Discovered group coordinator broker2:9292 (id:
2147483645 rack: null)

2018-08-24 20:17:19 INFO  ConsumerCoordinator:411 - [Consumer
clientId=testtables-8ca25259-6bb2-4bc1-9293-fc05e1b28ce7-StreamThread-1-consumer,
groupId=testtables] Revoking previously ass

Improve error message when trying to produce message without key for compacted topic

2018-08-21 Thread Patrik Kleindl
Hello

Yesterday we had the following exception:

Exception thrown when sending a message with key='null' and payload='...'
to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException:
This message has failed its CRC checksum, exceeds the valid size, or is
otherwise corrupt.

The cause was identified with the help of
https://stackoverflow.com/questions/49098274/kafka-stream-get-corruptrecordexception

Is it possible / would it makes sense to open an issue to improve the error
message for this case?
A simple "Message without a key is not valid for a compacted topic" would
suffice and point a user  in the right direction.

best regards

Patrik


Re: Usage of cleanup.policy=compact,delete

2018-08-13 Thread Patrik Kleindl
Issue opened: https://issues.apache.org/jira/browse/KAFKA-7281
The config command accepts the new format as described, but the error
message still references the old format/restrictions.
br, Patrik

On 12 August 2018 at 23:47, Matthias J. Sax  wrote:

> I think, at command line tool level, you need to use
>
> --add-config log.cleanup.policy=[compact,delete]
>
> ie, you need to add square bracket to mark the config as a list.
>
> This is different to Java code for which you would use
>
> props.put("log.cleanup.policy", "compact,delete");
>
> The config should be available at broker default as well as topic level
> configuration. Seems there is a glitch in the docs. Please file a minor
> PR or open a ticket so we can address this. Thx.
>
>
> -Matthias
>
> On 8/12/18 6:19 AM, Matt Farmer wrote:
> > We've run into some unexpected behavior around this as well, though I
> > forgot to send in a note when we found it so I'm fuzzy on the details at
> > the moment. I'll chime back in if I can dig up exactly what we were
> doing,
> > but I'd also welcome a ruling from someone with knowledge of the code. I
> > seem to recall we tried this for a repartition topic and it didn't do
> quite
> > what we expected.
> >
> > On Fri, Aug 10, 2018 at 3:02 AM Patrik Kleindl 
> wrote:
> >
> >> Hello
> >>
> >> In a discussion yesterday the question came up if an internal changelog
> >> topic can be enabled for compaction and deletion.
> >>
> >>
> >> https://stackoverflow.com/questions/50622369/kafka-
> streams-is-it-possible-to-have-compact-delete-policy-on-state-stores
> >> and
> >> https://issues.apache.org/jira/browse/KAFKA-4015
> >> say yes.
> >>
> >> https://kafka.apache.org/documentation/
> >> says yes for log.cleanup.policy on broker level
> >> The default cleanup policy for segments beyond the retention window. A
> >> comma separated list of valid policies. Valid policies are: "delete" and
> >> "compact"
> >> but no for cleanup.policy on topic level
> >> A string that is either "delete" or "compact".
> >>
> >> My command line on 1.1 seems to agree with the last part:
> >> ./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics
> >> --entity-name test --add-config log.cleanup.policy=compact,delete
> >> requirement failed: Invalid entity config: all configs to be added must
> be
> >> in the format "key=val".
> >> ./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics
> >> --entity-name test --add-config cleanup.policy=compact,delete
> >> requirement failed: Invalid entity config: all configs to be added must
> be
> >> in the format "key=val".
> >> ./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics
> >> --entity-name test --add-config cleanup.policy="compact,delete"
> >> requirement failed: Invalid entity config: all configs to be added must
> be
> >> in the format "key=val".
> >> ./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics
> >> --entity-name test --add-config cleanup.policy='compact,delete'
> >> requirement failed: Invalid entity config: all configs to be added must
> be
> >> in the format "key=val".
> >>
> >> Is this missing for the client and/or the topic-level configuration
> >> options?
> >>
> >> Does anyone know if/how this is supposed to work?
> >>
> >> Side-note: Our use-case (KTable with cleanup after some time) should be
> >> covered with WindowKTables as far as I understand, but the documentation
> >> for the cleanup.policy seems inconsistent.
> >>
> >> best regards
> >>
> >> Patrik
> >>
> >
>
>


Usage of cleanup.policy=compact,delete

2018-08-10 Thread Patrik Kleindl
Hello

In a discussion yesterday the question came up if an internal changelog
topic can be enabled for compaction and deletion.

https://stackoverflow.com/questions/50622369/kafka-streams-is-it-possible-to-have-compact-delete-policy-on-state-stores
and
https://issues.apache.org/jira/browse/KAFKA-4015
say yes.

https://kafka.apache.org/documentation/
says yes for log.cleanup.policy on broker level
The default cleanup policy for segments beyond the retention window. A
comma separated list of valid policies. Valid policies are: "delete" and
"compact"
but no for cleanup.policy on topic level
A string that is either "delete" or "compact".

My command line on 1.1 seems to agree with the last part:
./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics
--entity-name test --add-config log.cleanup.policy=compact,delete
requirement failed: Invalid entity config: all configs to be added must be
in the format "key=val".
./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics
--entity-name test --add-config cleanup.policy=compact,delete
requirement failed: Invalid entity config: all configs to be added must be
in the format "key=val".
./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics
--entity-name test --add-config cleanup.policy="compact,delete"
requirement failed: Invalid entity config: all configs to be added must be
in the format "key=val".
./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics
--entity-name test --add-config cleanup.policy='compact,delete'
requirement failed: Invalid entity config: all configs to be added must be
in the format "key=val".

Is this missing for the client and/or the topic-level configuration options?

Does anyone know if/how this is supposed to work?

Side-note: Our use-case (KTable with cleanup after some time) should be
covered with WindowKTables as far as I understand, but the documentation
for the cleanup.policy seems inconsistent.

best regards

Patrik