kafka per topic metrics

2020-07-14 Thread Pushkar Deole
Hi All, Need some help on kafka metrics, i am interested in certain metrics e.g. i need to know the number of records published on a particular topic and number of records consumed from that topic by a specific consumer group, i would need a total of these 2 and also average per second for them.

Need inputs - Confluent kafka memory usage pattern

2020-07-03 Thread Pushkar Deole
Hi All, We are using Confluent version of Kafka i.e. version 5.5.0. We deploy this as pod service on kubernetes. We use 3 broker pods and have set the request/limit memory for the pod to 512Mi/2GiB respectively and we observed all pods were almost touching the limit or going over the limit a bit

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-06-23 Thread Pushkar Deole
reams.processor.internals.GlobalStreamThread.initialize(GlobalStreamThread.java:345)\n\tat org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:270)\n"} On Thu, May 28, 2020 at 1:27 PM Pushkar Deole wrote: > Matthias, > > I realized that the exception and actual problem is totally different. The > proble

consumer crashes due to exception in Consumer.poll method

2020-06-19 Thread Pushkar Deole
Hi All, I don't know how to fix this and whether just catching exception from Consumer.poll method will help here: there is some issue with schema registry due to which the consumer application received error while deserializing the event. Now, even when schema registry was restarted the consumer

Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Pushkar Deole
g them out to a DLQ topic for manual > reprocessing purposes. > > Thanks, > > -- Ricardo > On 6/18/20 7:45 AM, Pushkar Deole wrote: > > Hi Gerbrand, > > thanks for the update, however if i dig more into it, the issue is because > of schema registry issue and th

Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Pushkar Deole
try'. > Do not store records that that you want to retry back into the original > topic! If you do that you're have a great risk that overload you're whole > kafka-cluster. > > Op 18-06-2020 09:55 heeft Pushkar Deole > geschreven: > > Hi All, > > This is wha

kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Pushkar Deole
Hi All, This is what I am observing: we have a consumer which polls data from topic, does the processing, again polls data which keeps happening continuously. At one time, there was some bad data on the topic which could not be consumed by consumer, probably because it couldn't deserialize the

kafka log compaction

2020-06-17 Thread Pushkar Deole
Hi All I want some of my topics to retain data forever without any deletion since those topics hold static data that is always required by application. Also, for these topic I want to retain latest value for key. I believe the cleanup policy of 'compact' would meet my needs. I have following

Setting kafka headers in ProducerRecord

2020-06-01 Thread Pushkar Deole
Hi All, I need to use Kafka header in the ProducerRecord, however I see the constructor to provide headers also need to mention the partition number. I don't want the records to be distributed to available partitions based on kafka's default mechanism and don't want to specify the partition by

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-28 Thread Pushkar Deole
, I am able to get it up and running. Due to above problem, it is very difficult to debug the issue and above bug can be fixed as soon as possible, or a proper exception should be thrown. On Wed, May 27, 2020 at 10:59 PM Pushkar Deole wrote: > Thanks... i will try increasing the memory in case

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

2020-05-28 Thread Pushkar Deole
on > from this KIP: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB > > > Guozhang > > > On Wed, May 27, 2020 at 8:44 PM Pushkar Deole > wrote: > > > Hello All, > > > > I am u

NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

2020-05-27 Thread Pushkar Deole
Hello All, I am using Stream DSL API just to create a GlobalKTable backed by a topic. The topology is simple, just create a global table from a topic and that's it (pasted below code snippet), when I run this service on K8S cluster (container in a pod), the service gets OutOfMemoryError during

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Pushkar Deole
tch to a global store instead > of GlobalKTable? That way, you can implement a custom `Processor` and > add a hook manually? > > I don't see anything wrong with your setup. Unclear if/why the global > store would require a lot of memory... > > > -Matthias > > On 5/27/

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Pushkar Deole
e stream"); kafkaStreams.close(); })); } On Wed, May 27, 2020 at 5:06 PM Pushkar Deole wrote: > Hi Matthias, > > By the way, I used the in-memory global store and the service is giving > out of memory error during startup. Unfortunately i don't have a stack > trace now but when i got sta

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Pushkar Deole
into this memory I believe since topic is empty. Can you provide inputs on this. On Wed, May 27, 2020 at 2:17 PM Pushkar Deole wrote: > Ok... got it... is there any hook that I can attach to the global k table > or global store? What I mean here is I want to know when the global store > i

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Pushkar Deole
hing that does usually not change over time. > > > -Matthias > > On 5/25/20 9:55 PM, Pushkar Deole wrote: > > Matthias, > > > > I am wondering what you mean by "Global store hold "axially" data that is > > provided from "outside" of the &g

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-25 Thread Pushkar Deole
store > >> which may not have been synced with the topic? > > Correct. There is no guarantee when the update to the global store will > be applied. As said, global stores are not designed to hold data the > application computes. > > > -Matthias > > > On 4/30/2

Re: data structures used by GlobalKTable, KTable

2020-05-22 Thread Pushkar Deole
balTable("topic", > Materialized.as(Stores.inMemoryKeyValueStore("store-name"))); > > > We are already working on an improved API via KIP-591: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type > >

Re: data structures used by GlobalKTable, KTable

2020-05-16 Thread Pushkar Deole
If my thinking is correct then for some scenarios or use cases, the MRU for GlobalKTable might also work as a local store of data since the MRU will always store the data required by that application instance. On Sun, May 17, 2020 at 9:42 AM Pushkar Deole wrote: > Matthias, > > I w

Re: data structures used by GlobalKTable, KTable

2020-05-16 Thread Pushkar Deole
balTable("topic", > Materialized.as(Stores.inMemoryKeyValueStore("store-name"))); > > > We are already working on an improved API via KIP-591: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type > >

topic input parameter in org.apache.kafka.common.serialization.Serializer interface

2020-05-15 Thread Pushkar Deole
Hi All, I am writing a Json customer serializer for our customer objects, which need to implement the Kafka Serializer interface. I am wondering what the input parameter "topic" is in the Serializer.serialize method. I don't think it is the topic on which data is to be published. Please correct

Re: data structures used by GlobalKTable, KTable

2020-05-15 Thread Pushkar Deole
re("store-name"))); > > > We are already working on an improved API via KIP-591: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type > > > > -Matthias > > > On 5/13/20 3:40 AM, Pushkar Deo

Re: data structures used by GlobalKTable, KTable

2020-05-13 Thread Pushkar Deole
y default, RocksDB is used. You can also change it to use an in-memory > store that is basically a HashMap. > > > -Matthias > > On 5/12/20 10:16 AM, Pushkar Deole wrote: > > Thanks Liam! > > > > On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson < > > liam.cla...

Re: data structures used by GlobalKTable, KTable

2020-05-12 Thread Pushkar Deole
help" > Clarke-Hutchinson > > On Tue, May 12, 2020 at 9:35 PM Pushkar Deole > wrote: > > > Hello confluent team, > > > > Could you provide some information on what data structures are used > > internally by GlobalKTable and KTables. The application th

Re: records with key as string and value as java ArrayList in topic

2020-05-12 Thread Pushkar Deole
ther end. > > Kind regards, > > Liam Clarke-Hutchinson > > On Tue, 12 May 2020, 5:13 pm Pushkar Deole, wrote: > > > And by the way, confluent has provided KafkaAvroSerializer/Deserialier. > > Can't they be used to do conversion for java types? > > > > On Tu

data structures used by GlobalKTable, KTable

2020-05-12 Thread Pushkar Deole
Hello confluent team, Could you provide some information on what data structures are used internally by GlobalKTable and KTables. The application that I am working on has a requirement to read cached data from GlobalKTable on every incoming event, so the reads from GlobalKTable need to be

Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread Pushkar Deole
And by the way, confluent has provided KafkaAvroSerializer/Deserialier. Can't they be used to do conversion for java types? On Tue, May 12, 2020 at 10:09 AM Pushkar Deole wrote: > Ok... so jackson json serialization is the way to go for hashmaps as well? > > On Mon, May 11, 2020 at 7:5

Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread Pushkar Deole
fore worrying about it further, though, you might want to check the > > InMemoryKeyValueStore implementation, since my answer was from memory. > > > > Thanks, > > John > > > > On Mon, May 11, 2020, at 03:47, Pushkar Deole wrote: > > > John, > > > i

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-11 Thread Pushkar Deole
e, and > potentially redesign the app if it’s actually a problem. > > I hope this helps! > -John > > On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote: > > Thanks John... appreciate your inputs and suggestions. I have been > assigned > > recently to this t

Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread Pushkar Deole
John, is there KIP in progress for supporting Java HashMap also? On Sun, May 10, 2020, 00:47 John Roesler wrote: > Yes, that’s correct. It’s only for serializing the java type ‘byte[]’. > > On Thu, May 7, 2020, at 10:37, Pushkar Deole wrote: > > Thanks John... I got to finish

Re: Error in kafka streams: The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-05-08 Thread Pushkar Deole
service is using at_least_once guarantee while the service in error is exactly once guarantee. So does this issue relate to transactions which are used only when exactly_once guarantee is set? On Mon, Apr 27, 2020 at 12:37 PM Pushkar Deole wrote: > came across this: seems to be the one >

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-08 Thread Pushkar Deole
the global KTable. > > Since you can anticipate that missed joins can be a problem, you can build > in some metrics and reporting for how many misses you actually observe, and > potentially redesign the app if it’s actually a problem. > > I hope this helps! > -John > >

Re: records with key as string and value as java ArrayList in topic

2020-05-07 Thread Pushkar Deole
al for ideas to write your own. > > Thanks, > John > > On Thu, May 7, 2020, at 08:17, Nicolas Carlot wrote: > > Won't say it's a good idea to use java serialized classes for messages, > but > > you should use a byteArraySerializer if you want to do such things > >

records with key as string and value as java ArrayList in topic

2020-05-07 Thread Pushkar Deole
Hi All, I have a requirement to store a record with key as java String and value as java's ArrayList in the kafka topic. Kafka has by default provided a StringSerializer and StringDeserializer, however for java ArrayList, how can get serializer. Do I need to write my own? Can someone share if

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-05 Thread Pushkar Deole
to database for every event would impact the throughput a lot. Probably having distributed caching (key/value pairs) would have comparatively lesser impact. Second choice is to go for GlobalKTable however this needs to be done very carefully. Thanks again! On Mon, May 4, 2020 at 11:18 PM Pushkar Deole

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-04 Thread Pushkar Deole
gt; table), it makes Streams wait until both inputs are buffered before > processing, so it can do a better job of processing in timestamp order. > > I hope this helps! > -John > > On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote: > > If i understand correctly, Kafka

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-03 Thread Pushkar Deole
have been synced with the topic? > > > > Correct. There is no guarantee when the update to the global store will > > be applied. As said, global stores are not designed to hold data the > > application computes. > > > > > > -Matthias > > > >

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-01 Thread Pushkar Deole
> -Matthias > > > On 4/30/20 7:42 AM, Pushkar Deole wrote: > > Thanks Matthias. > > Can you elaborate on the replicated caching layer part? > > When you say global stores, do you mean GlobalKTable created from a topic > > e.g. using StreamsBuilder.globalTable(Stri

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-04-30 Thread Pushkar Deole
ify state store from "outside". > > If you want to build a "replicated caching layer", you could use global > stores and write into the corresponding topics to update all stores. Of > course, those updates would be async. > > > -Matthias > > On 4/29/20 10

Re: are kafka state stores global or local?

2020-04-30 Thread Pushkar Deole
it would not be on _all_ instances (depending on > your configuration) and the main purpose of standby tasks is to reduce > recovery time. Ie, even if you have standby task, that are "passive" > (you could only access them via IQ) > > > -Matthias > > On 4/29/20 9:06 PM

can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-04-29 Thread Pushkar Deole
Hi All, I am wondering if this is possible: i have been asked to use state stores as a general replicated cache among multiple instances of service instances however the state store is created through streambuilder but is not actually modified through stream processor topology however it is to be

Re: are kafka state stores global or local?

2020-04-29 Thread Pushkar Deole
zed > > on the machine running the streams app. But it's considered "global" > > because it accesses all partitions of the input topic for the store. > > > > HTH, > > Bill > > > > On Wed, Apr 29, 2020 at 6:09 AM Pushkar Deole > wrote: > > >

are kafka state stores global or local?

2020-04-29 Thread Pushkar Deole
Hi, I am looking for some information as to whether kafka state store KeyValueStore is global or local only. I found that it can be in-memory or made persistent which stores it in local Rocksdb and also logging can be enabled so the state is backed by a topic which allows the state store to be

Can kafka state stores be used without streams and accessed globally through multiple application instances?

2020-04-28 Thread Pushkar Deole
Hi All, I have a kafka application which consums events from multiple topics, stores certain fields from the events in local in-memory cache, based on the cached data it enriches some of the events consumed from another topic e.g. say for agent id 10, the first name and last name are cached then

Re: Error in kafka streams: The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-04-27 Thread Pushkar Deole
came across this: seems to be the one https://issues.apache.org/jira/browse/KAFKA-8710 On Mon, Apr 27, 2020 at 12:17 PM Pushkar Deole wrote: > Thanks... can you point to those improvements/bugs that are fixed in 2.5? > > On Mon, Apr 27, 2020 at 1:03 AM Matthias J. Sax wrote: > &

Re: Error in kafka streams: The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-04-27 Thread Pushkar Deole
tialize > itself automatically. Of course, you can also see this as an improvement > and not a bug :) > > > -Matthias > > On 4/25/20 7:48 AM, Pushkar Deole wrote: > > version used is 2.3 > > however, not sure if this is a bug.. after doing some search, came across > >

Re: Error in kafka streams: The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-04-25 Thread Pushkar Deole
e than that. On Fri, Apr 24, 2020 at 10:46 PM Matthias J. Sax wrote: > This version are you using? > > Couple of broker and client side exactly-once related bugs got fix in > the latest release 2.5.0. > > > -Matthias > > On 4/23/20 11:59 PM, Pushkar Deole wrote: > > Hello

Error in kafka streams: The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-04-24 Thread Pushkar Deole
Hello All, While using kafka streams application, we are intermittently getting following exception and stream is closed. We need to restart the application to get it working again and start processing. This exception is observed in some of the labs which are being idle for some time but it is

Re: min.insync.replicas and producer acks

2020-01-25 Thread Pushkar Deole
I mean, the producer acks to be 'none' On Sat, Jan 25, 2020 at 4:49 PM Pushkar Deole wrote: > Thank you for a quick response. > > What would happen if I set the producer acks to be 'one' and > min.insync.replicas to 2. In this case the producer will return when only >

Re: min.insync.replicas and producer acks

2020-01-25 Thread Pushkar Deole
of 2 will be guaranteed by kafka? On Sat, Jan 25, 2020 at 12:50 PM Boyang Chen wrote: > Hey Pushkar, > > producer ack only has 3 options: none, one, or all. You could not nominate > an arbitrary number. > > On Fri, Jan 24, 2020 at 7:53 PM Pushkar Deole > wrote: > > &

min.insync.replicas and producer acks

2020-01-24 Thread Pushkar Deole
Hi All, I am a bit confused about min.insync.replicas and producer acks. Are these two configurations achieve the same thing? e.g. if I set min.insync.replicas to 2, I can also achieve it by setting producer acks to 2 so the producer won't get a ack until 2 replicas received the message?

Is there a way to auto scale topic partitions in kafka?

2020-01-17 Thread Pushkar Deole
Hello, I am working on developing a microservice based system which uses kafka as a messaging infrastructure. The microservices application are mainly kafka consumers and kafka streams applications and are deployed as docker containers on kubernetes. The system should be designed to be auto

Getting started with Kafka topic to store multiple types

2019-07-25 Thread Pushkar Deole
Hi All, I am new to Kafka and still getting myself acquainted with the product. I have a basic question around using Kafka. I want to store in a Kafka topic, a string value against some keys while a HashMap value against some of the keys. For this purpose, I have created two different producers

<    1   2