To Know about Kafka and it's usage
Hi, This is to know more about Kafka and how i can use it in my project. I am trying to learn about Big Data Engineering and came across Kafka. I am trying to develop an application which could take some real time data, filter it and show some visual outputs and would like to know where Kafka could fit and what other technologies should i use. I was thinking about using Spark, Python, Google Cloud. Any help regarding this would be great. Thanks, Parth Patel
Re: windowed store excessive memory consumption
>> I have a feeling that it would be helpful to add this to documentation >> examples as well as javadocs for all methods that do return iterators. That makes sense. Can you create a JIRA for this? Thanks. -Matthias On 9/27/17 2:54 PM, Stas Chizhov wrote: > Thanks, that comment actually mad its way to the documentation already. > Apparently none of that was related. It was a leak - I was not closing an > iterator that was returned by > https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/state/ReadOnlyWindowStore.html#fetch(K,%20long,%20long) > Methods javadoc does not mention that the iterator needs to be closed. > Neither does > https://docs.confluent.io/current/streams/developer-guide.html#querying-local-window-stores > . > Unfortunately I have not read iterators javadoc earlier which does mention > it. > I have a feeling that it would be helpful to add this to documentation > examples as well as javadocs for all methods that do return iterators. > > Best regards, > Stanislav. > > > > 2017-09-27 21:53 GMT+02:00 Ted Yu : > >> Have you seen this comment ? >> >> https://issues.apache.org/jira/browse/KAFKA-5122? >> focusedCommentId=15984467&page=com.atlassian.jira. >> plugin.system.issuetabpanels:comment-tabpanel#comment-15984467 >> >> On Wed, Sep 27, 2017 at 12:44 PM, Stas Chizhov wrote: >> >>> Hi, >>> >>> I am running a simple kafka streams app (0.11.0.1) that counts messages >> per >>> hour per partition. The app runs in a docker container with a memory >> limit >>> set, which is always reached by the app within few minutes and then >>> container is killed. After running it with various number of instances, >>> different memory limits and in-memory store instead - it looks like it is >>> off-heap memory being taken by rocks db. After playing with different >>> memory limits it looks like rocksdb assumes it can grab all the physical >>> memory of the machine, so if the container limit is less than it gets >>> killed on the way. >>> >>> Also I have not changed any rocksdb config settings, but the defaults >>> mentioned here: >>> https://docs.confluent.io/current/streams/developer- >>> guide.html#streams-developer-guide-rocksdb-config >>> looks nowhere close to the consumption observed. >>> >>> >>> Few details about the app: >>> I use windowed store defined as follows: >>> >>> StateStoreSupplier windowCounts = Stores.create(WINDOW_COUNT_STORE) >>>.withIntegerKeys() >>>.withLongValues() >>>.persistent() >>>.enableCaching() >>>.windowed(MINUTES.toMillis(1), HOURS.toMillis(5), 10, false) >>>.build(); >>> >>> There is a processor that updates a count for a partition for a timestamp >>> that is rounded to an hour boundary: >>> >>> store.put( >>>context.partition(), >>>current(floor(value.getIngressTime())) + 1, >>>floor(value.getIngressTime()) >>> ); >>> >>> >>> Any hints on what might cause this or any config settings? >>> >>> Thank you, >>> Stanislav. >>> >> > signature.asc Description: OpenPGP digital signature
Re: windowed store excessive memory consumption
Thanks, that comment actually mad its way to the documentation already. Apparently none of that was related. It was a leak - I was not closing an iterator that was returned by https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/state/ReadOnlyWindowStore.html#fetch(K,%20long,%20long) Methods javadoc does not mention that the iterator needs to be closed. Neither does https://docs.confluent.io/current/streams/developer-guide.html#querying-local-window-stores . Unfortunately I have not read iterators javadoc earlier which does mention it. I have a feeling that it would be helpful to add this to documentation examples as well as javadocs for all methods that do return iterators. Best regards, Stanislav. 2017-09-27 21:53 GMT+02:00 Ted Yu : > Have you seen this comment ? > > https://issues.apache.org/jira/browse/KAFKA-5122? > focusedCommentId=15984467&page=com.atlassian.jira. > plugin.system.issuetabpanels:comment-tabpanel#comment-15984467 > > On Wed, Sep 27, 2017 at 12:44 PM, Stas Chizhov wrote: > > > Hi, > > > > I am running a simple kafka streams app (0.11.0.1) that counts messages > per > > hour per partition. The app runs in a docker container with a memory > limit > > set, which is always reached by the app within few minutes and then > > container is killed. After running it with various number of instances, > > different memory limits and in-memory store instead - it looks like it is > > off-heap memory being taken by rocks db. After playing with different > > memory limits it looks like rocksdb assumes it can grab all the physical > > memory of the machine, so if the container limit is less than it gets > > killed on the way. > > > > Also I have not changed any rocksdb config settings, but the defaults > > mentioned here: > > https://docs.confluent.io/current/streams/developer- > > guide.html#streams-developer-guide-rocksdb-config > > looks nowhere close to the consumption observed. > > > > > > Few details about the app: > > I use windowed store defined as follows: > > > > StateStoreSupplier windowCounts = Stores.create(WINDOW_COUNT_STORE) > >.withIntegerKeys() > >.withLongValues() > >.persistent() > >.enableCaching() > >.windowed(MINUTES.toMillis(1), HOURS.toMillis(5), 10, false) > >.build(); > > > > There is a processor that updates a count for a partition for a timestamp > > that is rounded to an hour boundary: > > > > store.put( > >context.partition(), > >current(floor(value.getIngressTime())) + 1, > >floor(value.getIngressTime()) > > ); > > > > > > Any hints on what might cause this or any config settings? > > > > Thank you, > > Stanislav. > > >
Re: windowed store excessive memory consumption
Have you seen this comment ? https://issues.apache.org/jira/browse/KAFKA-5122?focusedCommentId=15984467&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15984467 On Wed, Sep 27, 2017 at 12:44 PM, Stas Chizhov wrote: > Hi, > > I am running a simple kafka streams app (0.11.0.1) that counts messages per > hour per partition. The app runs in a docker container with a memory limit > set, which is always reached by the app within few minutes and then > container is killed. After running it with various number of instances, > different memory limits and in-memory store instead - it looks like it is > off-heap memory being taken by rocks db. After playing with different > memory limits it looks like rocksdb assumes it can grab all the physical > memory of the machine, so if the container limit is less than it gets > killed on the way. > > Also I have not changed any rocksdb config settings, but the defaults > mentioned here: > https://docs.confluent.io/current/streams/developer- > guide.html#streams-developer-guide-rocksdb-config > looks nowhere close to the consumption observed. > > > Few details about the app: > I use windowed store defined as follows: > > StateStoreSupplier windowCounts = Stores.create(WINDOW_COUNT_STORE) >.withIntegerKeys() >.withLongValues() >.persistent() >.enableCaching() >.windowed(MINUTES.toMillis(1), HOURS.toMillis(5), 10, false) >.build(); > > There is a processor that updates a count for a partition for a timestamp > that is rounded to an hour boundary: > > store.put( >context.partition(), >current(floor(value.getIngressTime())) + 1, >floor(value.getIngressTime()) > ); > > > Any hints on what might cause this or any config settings? > > Thank you, > Stanislav. >
windowed store excessive memory consumption
Hi, I am running a simple kafka streams app (0.11.0.1) that counts messages per hour per partition. The app runs in a docker container with a memory limit set, which is always reached by the app within few minutes and then container is killed. After running it with various number of instances, different memory limits and in-memory store instead - it looks like it is off-heap memory being taken by rocks db. After playing with different memory limits it looks like rocksdb assumes it can grab all the physical memory of the machine, so if the container limit is less than it gets killed on the way. Also I have not changed any rocksdb config settings, but the defaults mentioned here: https://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-rocksdb-config looks nowhere close to the consumption observed. Few details about the app: I use windowed store defined as follows: StateStoreSupplier windowCounts = Stores.create(WINDOW_COUNT_STORE) .withIntegerKeys() .withLongValues() .persistent() .enableCaching() .windowed(MINUTES.toMillis(1), HOURS.toMillis(5), 10, false) .build(); There is a processor that updates a count for a partition for a timestamp that is rounded to an hour boundary: store.put( context.partition(), current(floor(value.getIngressTime())) + 1, floor(value.getIngressTime()) ); Any hints on what might cause this or any config settings? Thank you, Stanislav.
Re: how to use Confluent connector with Apache Kafka
All connectors are compatible with vanilla AK, as Confluent Open Source ships with "plain" Apache Kafka under the hood. So you can just download the connector, plug it in, and configure it as any other connector, too. https://www.confluent.io/product/connectors/ -Matthias On 9/26/17 1:15 PM, Marina Popova wrote: > Hi, > we have an existing Kafka cluster (0.10) already setup and working in > production. > I woudl like to explore using Confluent's Elasticsearch Connector - however, > I see it comes as part of the Confluent distribution of Kafka (with separate > confluent scripts, libs, etc.). > > Is there an easy way to just use the Confluent Connector with the plain > (non-Confluent) Apache distribution of Kafka? > > thanks! > Marina > signature.asc Description: OpenPGP digital signature
Re: out of order sequence number in exactly once streams
An OutOfOrderSequenceException should only occur if a idempotent producer gets out of sync with the broker. If you set `enable.idempotence = true` on your producer, you might want to set `retries = Integer.MAX_VALUE`. -Matthias On 9/26/17 11:30 PM, Sameer Kumar wrote: > Hi, > > I again received this exception while running my streams app. I am using > Kafka 11.0.1. After restarting my app, this error got fixed. > > I guess this might be due to bad network. Any pointers. Any config > wherein I can configure it for retries. > > Exception trace is attached. > > Regards, > -Sameer. signature.asc Description: OpenPGP digital signature
Re: Debugging invalid_request response from a .10.2 server for list offset api using librdkafka client
I understand that it won't support it, my only concern is about the error code. Locally with these settings I get a message formatted error, 43 . Which makes sense. In one particular cluster we see an invalid request 42 instead of unsupported format 43. What are the implications of changing the broker protocol version to 10.2, for the topics that were created before this change? My assumption is that they will return 43 for list offset request version 1+ and all other requests would work. Is that correct? Also, can the message format be changed for a topic from 0.8.1 to 0.10.2? If not, what is the recommended way to upgrade old topics. On Sep 27, 2017 11:15 AM, "Hans Jespersen" wrote: > The 0.8.1 protocol does not support target timestamps so it makes sense > that you would get an invalid request error if the client is sending a > Version 1 or Version 2 Offsets Request. The only Offset Request that a > 0.8.1 broker knows how to handle is a Version 0 Offsets Request. > > From https://kafka.apache.org/protocol > INVALID_REQUEST 42 False This most likely occurs because of a request being > malformed by the client library or the message was sent to an incompatible > broker. See the broker logs for more details. > > For more info on the 0.11 Kafka protocol and ListOffset Requests see > > https://cwiki.apache.org/confluence/display/KAFKA/A+ > Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol- > OffsetAPI(AKAListOffset) > > -hans > > /** > * Hans Jespersen, Principal Systems Engineer, Confluent Inc. > * h...@confluent.io (650)924-2670 > */ > > On Wed, Sep 27, 2017 at 10:20 AM, Vignesh wrote: > > > Correction in above mail, we get 42 - INVALID_REQUEST, not 43. > > Few other data points > > > > Server has following configs set > > > > inter.broker.protocol.version=0.8.1 > > > > log.message.format.version=0.8.1 > > > > > > > > My understanding is that we should get unsupported message format with > > above configurations, why do we get invalid_request? > > > > > > Thanks, > > > > Vignesh. > > > > > > > > > > On Wed, Sep 27, 2017 at 9:51 AM, Vignesh wrote: > > > > > Hi, > > > > > > We are using LibrdKafka library version .11.0 and calling List Offset > API > > > with a timestamp on a 0.10.2 kafka server installed in a windows > machine. > > > > > > This request returns an error code, 43 - INVALID_REQUEST. > > > > > > We have other local installations of Kafka version 0.10.2 (also on > > > Windows) and are able to use the library successfully. > > > > > > Are there any settings on this specific server that is causing this > > error? > > > Which logs can we enable and look at to get additional details about > what > > > is wrong with the request? > > > > > > Thanks, > > > Vignesh. > > > > > >
Re: Debugging invalid_request response from a .10.2 server for list offset api using librdkafka client
The 0.8.1 protocol does not support target timestamps so it makes sense that you would get an invalid request error if the client is sending a Version 1 or Version 2 Offsets Request. The only Offset Request that a 0.8.1 broker knows how to handle is a Version 0 Offsets Request. >From https://kafka.apache.org/protocol INVALID_REQUEST 42 False This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details. For more info on the 0.11 Kafka protocol and ListOffset Requests see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) -hans /** * Hans Jespersen, Principal Systems Engineer, Confluent Inc. * h...@confluent.io (650)924-2670 */ On Wed, Sep 27, 2017 at 10:20 AM, Vignesh wrote: > Correction in above mail, we get 42 - INVALID_REQUEST, not 43. > Few other data points > > Server has following configs set > > inter.broker.protocol.version=0.8.1 > > log.message.format.version=0.8.1 > > > > My understanding is that we should get unsupported message format with > above configurations, why do we get invalid_request? > > > Thanks, > > Vignesh. > > > > > On Wed, Sep 27, 2017 at 9:51 AM, Vignesh wrote: > > > Hi, > > > > We are using LibrdKafka library version .11.0 and calling List Offset API > > with a timestamp on a 0.10.2 kafka server installed in a windows machine. > > > > This request returns an error code, 43 - INVALID_REQUEST. > > > > We have other local installations of Kafka version 0.10.2 (also on > > Windows) and are able to use the library successfully. > > > > Are there any settings on this specific server that is causing this > error? > > Which logs can we enable and look at to get additional details about what > > is wrong with the request? > > > > Thanks, > > Vignesh. > > >
Re: Debugging invalid_request response from a .10.2 server for list offset api using librdkafka client
Correction in above mail, we get 42 - INVALID_REQUEST, not 43. Few other data points Server has following configs set inter.broker.protocol.version=0.8.1 log.message.format.version=0.8.1 My understanding is that we should get unsupported message format with above configurations, why do we get invalid_request? Thanks, Vignesh. On Wed, Sep 27, 2017 at 9:51 AM, Vignesh wrote: > Hi, > > We are using LibrdKafka library version .11.0 and calling List Offset API > with a timestamp on a 0.10.2 kafka server installed in a windows machine. > > This request returns an error code, 43 - INVALID_REQUEST. > > We have other local installations of Kafka version 0.10.2 (also on > Windows) and are able to use the library successfully. > > Are there any settings on this specific server that is causing this error? > Which logs can we enable and look at to get additional details about what > is wrong with the request? > > Thanks, > Vignesh. >
Debugging invalid_request response from a .10.2 server for list offset api using librdkafka client
Hi, We are using LibrdKafka library version .11.0 and calling List Offset API with a timestamp on a 0.10.2 kafka server installed in a windows machine. This request returns an error code, 43 - INVALID_REQUEST. We have other local installations of Kafka version 0.10.2 (also on Windows) and are able to use the library successfully. Are there any settings on this specific server that is causing this error? Which logs can we enable and look at to get additional details about what is wrong with the request? Thanks, Vignesh.
Re: Running Kafka on docker containers
I'm not sure how much helpful it will be for you, but there was a talk about "Best practices for running Kafka on Docker" on the Kafka Summit in August. You can check the video and slides here: https://www.confluent.io/ kafka-summit-sf17/resource/ Jakub On Tue, Sep 26, 2017 at 6:56 AM, Anoop Putta (aputta) wrote: > Hi, > > Is it a good idea to run kafka as docker containers in the production > deployments? Do you guys foresee any blocks with this approach? > Please advise. > > -Anoop P >
Re: Kafka stream transformations
Hi Roshan, KafkaStreams apps run as a client application. It does not run on the broker. You develop an application and give it an `application.id` - you deploy how over many instances of that application you like and they all share the same topology. I suggest you take a look at the docs here: https://kafka.apache.org/documentation/streams/ and here: https://docs.confluent.io/current/streams/index.html On Wed, 27 Sep 2017 at 01:55 Roshan wrote: > Hi, > > I'm looking for information on where the stream transformations are > applied - the server(broker) or the client? > > Would it be possible for clients to share the topology? > > -- > Warm regards > Roshan >