Re: Broker Notification API for Kafka?
Might this message be more appropriate for the developer mailing list? Is it okay to double post in this case? Thanks again, Dominic. On Jun 12, 2016 00:59, "Dominic Chambers" wrote: I'd like to add a notification API to Kafka so that I can receive the following notifications for each broker in the cluster: 1. An event notification when a broker is promoted from follower to leader, or when a follower catches up with the leader or falls behind the leader. 2. An event notification each time a new event is written to the leader partition. 3. An event notification each time an event is received by a partition follower. My motivation for having this API is two-fold: 1. I want to be able to do 'stream processing' without having to run a separate cluster for my app logic. 2. I want to keep the processing load next to the data, avoiding all unnecessary network communication. This makes sense for me because I want to use Kafka as an event-sourcing platform, rather than as a general streaming backbone between legacy applications. Could somebody with knowledge of the code base please provide me some pointers as to how I should approach adding an API like this, or provide further information if this is considered to be a bad idea. Thanks in advance, Dominic Chambers.
Re: [DISCUSS] Java 8 as a minimum requirement
Hi Joe, Thanks for your feedback. Compatibility is a complex subject and the exact details need to be defined for each project. Technically, one could claim that making changes so that a 0.10.1.0 client doesn't work with a 0.10.0.0 broker would be an incompatible change and should not be allowed, but we don't have such a rule for Kafka today. Bumping the minimum required Java version could be seen as an incompatible change that needs a major release or not since users don't have to change or recompile their code (as I mentioned elsewhere, Akka 2.4.0 is fully compatible apart from the bump in the JDK version). For what is worth, the Java 6 to Java 7 transition in Kafka (KAFKA-2316) was proposed in July of last year by Harsha in the middle of the 0.8.3 cycle and it was approved. The rename of 0.8.3 to 0.9.0 was only raised in September by Gwen and the reasons given were: "What do you think of making the next release (the one with security, new consumer, quotas, etc) a 0.9.0 instead of 0.8.3? It has lots of new features, and new consumer was pretty much scoped for 0.9.0, so it matches our original roadmap. I feel that so many awesome features deserve a better release number." No Java bump mentioned there or elsewhere I could see. So, it seems like this is a new rule we are trying to introduce. Which is not necessarily an issue, but it's important to make it clear. Ismael On Fri, Jun 17, 2016 at 3:38 PM, Joe Stein wrote: > Compatibility shouldn't be broken in a minor release. Minor versions are > for new features in a backwards-compatible manner. The Kafka bylaws do not > explicitly state this but I believe it is implied based on general practice > and so many other Apache projects explicitly calling this out, documenting > and communicating their semantic version strategy. > > If JDK8 is so much desired then jump to 0.11 and only do bug fixes on the > 0.10 release (which should be rigorous and not forceful to make folks > upgrade unnecessarily to get such improvements). > > My 0.2824152382 cents. > > Regards, > > ~ Joe Stein > > On Fri, Jun 17, 2016 at 8:53 AM, Marina wrote: > > > +1 - wish it was already done with Kafka 0.9 version :) > > > > > > From: Tommy Becker > > To: users@kafka.apache.org > > Sent: Friday, June 17, 2016 7:55 AM > > Subject: Re: [DISCUSS] Java 8 as a minimum requirement > > > > +1 We're on Java 8 already. > > > > On 06/16/2016 04:45 PM, Ismael Juma wrote: > > > > Hi all, > > > > I would like to start a discussion on making Java 8 a minimum requirement > > for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This > > is the first discussion on the topic so the idea is to understand how > > people feel about it. If people feel it's too soon, then we can pick up > the > > conversation again after Kafka 0.10.1.0. If the feedback is mostly > > positive, I will start a vote thread. > > > > Let's start with some dates. Java 7 hasn't received public updates since > > April 2015[1], Java 8 was released in March 2014[2] and Java 9 is > scheduled > > to be released in March 2017[3]. > > > > The first argument for dropping support for Java 7 is that the last > public > > release by Oracle contains a large number of known security > > vulnerabilities. The effectiveness of Kafka's security features is > reduced > > if the underlying runtime is not itself secure. > > > > The second argument for moving to Java 8 is that it adds a number of > > compelling features: > > > > * Lambda expressions and method references (particularly useful for the > > Kafka Streams DSL) > > * Default methods (very useful for maintaining compatibility when adding > > methods to interfaces) > > * java.util.stream (helpful for making collection transformations more > > concise) > > * Lots of improvements to java.util.concurrent (CompletableFuture, > > DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator) > > * Other nice things: SplittableRandom, Optional (and many others I have > not > > mentioned) > > > > The third argument is that it will simplify our testing matrix, we won't > > have to test with Java 7 any longer (this is particularly useful for > system > > tests that take hours to run). It will also make it easier to support > Scala > > 2.12, which requires Java 8. > > > > The fourth argument is that many other open-source projects have taken > the > > leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7], > > Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will > > support Java 8 in the next version (although it will take a while before > > most phones will use that version sadly). This reduces (but does not > > eliminate) the chance that we would be the first project that would > cause a > > user to consider a Java upgrade. > > > > The main argument for not making the change is that a reasonable number > of > > users may still be using Java 7 by the time Kafka 0.10.1.0 is released. > > More specifically, we care about the
Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api
The consumer instances close I.e leave the group only if they are idle for a long time..we have expiration threads which monitor this and remove any consumer instances if they keep sitting . Also , consumers are closed when the application is shut down. The poll() does receive around 481 records the second time, but we process only 10 messages at a time. So the processing time is not very large . From: Ewen Cheslack-Postava Sent: 20 June 2016 10:52:29 To: users@kafka.apache.org Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api Rohit, The 30s number sounds very suspicious because it is exactly the value of the session timeout. But if you are driving the consumer correctly, you shouldn't normally hit this timeout. Dana was asking about consumers leaving gracefully because that is one case where you can inadvertently trigger the 30s timeout, require *all* group members to wait that long before they decide one of the previous members has ungracefully left the group and they move on without it. It sounds like something you are doing is causing the group to wait for the session timeout. Is it possible any of your processes are exiting without calling consumer.close()? Or that any of your processes are not calling consumer.poll() within the session timeout of 30s? This can sometimes happen if they receive too much data and take too long to process it (0.10 introduced max.poll.records to help users control this, and we're making further refinements to the consumer to provide better application control over number of messages fetched vs total processing time). -Ewen On Sun, Jun 19, 2016 at 10:01 PM, Rohit Sardesai wrote: > > Can anybody help out on this? > > From: Rohit Sardesai > Sent: 19 June 2016 11:47:01 > To: users@kafka.apache.org > Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer > api > > > In my tests , I am using around 24 consumer groups. I never call > consumer.close() or consumer.unsubscribe() until the application is > shutting down. > > So the consumers never leave but new consumer instances do get created as > the parallel requests pile up . Also, I am reusing consumer instances > > if they are idle ( i,.e not serving any consume request). So with 9 > partitions , I do 9 parallel consume requests in parallel every second > under the same consumer group. > > So to summarize I have the following test setup : 3 Kafka brokers , 2 > zookeeper nodes, 1 topic , 9 partitions , 24 consumer groups and 9 consume > requests at a time. > > > > From: Dana Powers > Sent: 19 June 2016 10:45 > To: users@kafka.apache.org > Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer > api > > Is your test reusing a group name? And if so, are your consumer instances > gracefully leaving? This may cause subsequent 'rebalance' operations to > block until those old consumers check-in or the session timeout happens > (30secs) > > -Dana > On Jun 18, 2016 8:56 PM, "Rohit Sardesai" > wrote: > > > I am using the group management feature of Kafka 0.9 to handle partition > > assignment to consumer instances. I use the subscribe() API to subscribe > to > > the topic I am interested in reading data from. I have an environment > > where I have 3 Kafka brokers with a couple of Zookeeper nodes . I > created > > a topic with 9 partitions . The performance tests attempt to send 9 > > parallel poll() requests to the Kafka brokers every second. The results > > show that each poll() operation takes around 30 seconds for the first > time > > it polls and returns 0 records. Also , when I print the partition > > assignment to this consumer instance , I see no partitions assigned to > it. > > The next poll() does return quickly ( ~ 10-20 ms) with data and some > > partitions assigned to it. > > > > With each consumer taking 30 seconds , the performance tests report very > > low throughput since I run the tests for around 1000 seconds out which I > > produce messages on the topic for the complete duration and I start the > > parallel consume requests after 400 seconds. So out of 400 seconds , > with 9 > > consumers taking 30 seconds each , around 270 seconds are spent in the > > first poll without any data. Is this because of the re-balance operation > > that the consumers are blocked on the poll() ? What is the best way to > use > > poll() if I have to serve many parallel requests per second ? Should I > > prefer manual assignment of partitions in this case instead of relying on > > re-balance ? > > > > > > Regards, > > > > Rohit Sardesai > > > > > -- Thanks, Ewen
Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api
Rohit, The 30s number sounds very suspicious because it is exactly the value of the session timeout. But if you are driving the consumer correctly, you shouldn't normally hit this timeout. Dana was asking about consumers leaving gracefully because that is one case where you can inadvertently trigger the 30s timeout, require *all* group members to wait that long before they decide one of the previous members has ungracefully left the group and they move on without it. It sounds like something you are doing is causing the group to wait for the session timeout. Is it possible any of your processes are exiting without calling consumer.close()? Or that any of your processes are not calling consumer.poll() within the session timeout of 30s? This can sometimes happen if they receive too much data and take too long to process it (0.10 introduced max.poll.records to help users control this, and we're making further refinements to the consumer to provide better application control over number of messages fetched vs total processing time). -Ewen On Sun, Jun 19, 2016 at 10:01 PM, Rohit Sardesai wrote: > > Can anybody help out on this? > > From: Rohit Sardesai > Sent: 19 June 2016 11:47:01 > To: users@kafka.apache.org > Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer > api > > > In my tests , I am using around 24 consumer groups. I never call > consumer.close() or consumer.unsubscribe() until the application is > shutting down. > > So the consumers never leave but new consumer instances do get created as > the parallel requests pile up . Also, I am reusing consumer instances > > if they are idle ( i,.e not serving any consume request). So with 9 > partitions , I do 9 parallel consume requests in parallel every second > under the same consumer group. > > So to summarize I have the following test setup : 3 Kafka brokers , 2 > zookeeper nodes, 1 topic , 9 partitions , 24 consumer groups and 9 consume > requests at a time. > > > > From: Dana Powers > Sent: 19 June 2016 10:45 > To: users@kafka.apache.org > Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer > api > > Is your test reusing a group name? And if so, are your consumer instances > gracefully leaving? This may cause subsequent 'rebalance' operations to > block until those old consumers check-in or the session timeout happens > (30secs) > > -Dana > On Jun 18, 2016 8:56 PM, "Rohit Sardesai" > wrote: > > > I am using the group management feature of Kafka 0.9 to handle partition > > assignment to consumer instances. I use the subscribe() API to subscribe > to > > the topic I am interested in reading data from. I have an environment > > where I have 3 Kafka brokers with a couple of Zookeeper nodes . I > created > > a topic with 9 partitions . The performance tests attempt to send 9 > > parallel poll() requests to the Kafka brokers every second. The results > > show that each poll() operation takes around 30 seconds for the first > time > > it polls and returns 0 records. Also , when I print the partition > > assignment to this consumer instance , I see no partitions assigned to > it. > > The next poll() does return quickly ( ~ 10-20 ms) with data and some > > partitions assigned to it. > > > > With each consumer taking 30 seconds , the performance tests report very > > low throughput since I run the tests for around 1000 seconds out which I > > produce messages on the topic for the complete duration and I start the > > parallel consume requests after 400 seconds. So out of 400 seconds , > with 9 > > consumers taking 30 seconds each , around 270 seconds are spent in the > > first poll without any data. Is this because of the re-balance operation > > that the consumers are blocked on the poll() ? What is the best way to > use > > poll() if I have to serve many parallel requests per second ? Should I > > prefer manual assignment of partitions in this case instead of relying on > > re-balance ? > > > > > > Regards, > > > > Rohit Sardesai > > > > > -- Thanks, Ewen
Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api
Can anybody help out on this? From: Rohit Sardesai Sent: 19 June 2016 11:47:01 To: users@kafka.apache.org Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api In my tests , I am using around 24 consumer groups. I never call consumer.close() or consumer.unsubscribe() until the application is shutting down. So the consumers never leave but new consumer instances do get created as the parallel requests pile up . Also, I am reusing consumer instances if they are idle ( i,.e not serving any consume request). So with 9 partitions , I do 9 parallel consume requests in parallel every second under the same consumer group. So to summarize I have the following test setup : 3 Kafka brokers , 2 zookeeper nodes, 1 topic , 9 partitions , 24 consumer groups and 9 consume requests at a time. From: Dana Powers Sent: 19 June 2016 10:45 To: users@kafka.apache.org Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api Is your test reusing a group name? And if so, are your consumer instances gracefully leaving? This may cause subsequent 'rebalance' operations to block until those old consumers check-in or the session timeout happens (30secs) -Dana On Jun 18, 2016 8:56 PM, "Rohit Sardesai" wrote: > I am using the group management feature of Kafka 0.9 to handle partition > assignment to consumer instances. I use the subscribe() API to subscribe to > the topic I am interested in reading data from. I have an environment > where I have 3 Kafka brokers with a couple of Zookeeper nodes . I created > a topic with 9 partitions . The performance tests attempt to send 9 > parallel poll() requests to the Kafka brokers every second. The results > show that each poll() operation takes around 30 seconds for the first time > it polls and returns 0 records. Also , when I print the partition > assignment to this consumer instance , I see no partitions assigned to it. > The next poll() does return quickly ( ~ 10-20 ms) with data and some > partitions assigned to it. > > With each consumer taking 30 seconds , the performance tests report very > low throughput since I run the tests for around 1000 seconds out which I > produce messages on the topic for the complete duration and I start the > parallel consume requests after 400 seconds. So out of 400 seconds , with 9 > consumers taking 30 seconds each , around 270 seconds are spent in the > first poll without any data. Is this because of the re-balance operation > that the consumers are blocked on the poll() ? What is the best way to use > poll() if I have to serve many parallel requests per second ? Should I > prefer manual assignment of partitions in this case instead of relying on > re-balance ? > > > Regards, > > Rohit Sardesai > >
Re: Fail fast producer/consumer when no connection to Kafka brokers cluster
You can adjust request.timeout.ms, which is shared between both new producer and new consumer. I don't think its quite what you want, but probably the closest that exists across both clients. There's not much more than that -- when you say "when the connection to the entire broker cluster is lost" that's not really something that we'd expect to happen; you might lose connectivity to some brokers, but even determining that the connection "to the entire broker cluster" is lost is not something you can easily determine (and often times requires long timeouts to determine anyway). -Ewen On Fri, Jun 17, 2016 at 3:27 AM, Spico Florin wrote: > Hello! > I would like to know what are the configurations/properties for the > producer/consumer in order fail fast when the connection to the entire > broker cluster is lost. > For example if we can set up a parameter that when the connection trial > reached a treshold then disconnect and throw an exception. > > I look forward for your answers. > Regards, > Florin > -- Thanks, Ewen
Re: [DISCUSS] Java 8 as a minimum requirement
Hi Ismael, Agree on timing is more important. If we give enough heads up to the users who are on Java 7 thats great but still shipping this in 0.10.x line is won't be good as it still perceived as maint release even the release might contain lot of features . If we can make this as part of 0.11 and cutting 0.10.1 features moving to 0.11 and giving rough timeline when that would be released would be ideal. Thanks, Harsha On Fri, Jun 17, 2016, at 11:13 AM, Ismael Juma wrote: > Hi Harsha, > > Comments below. > > On Fri, Jun 17, 2016 at 7:48 PM, Harsha wrote: > > > Hi Ismael, > > "Are you saying that you are aware of many Kafka users still > > using Java 7 > > > who would be ready to upgrade to the next Kafka feature release (whatever > > > that version number is) before they can upgrade to Java 8?" > > I know there quite few users who are still on java 7 > > > This is good to know. > > > > and regarding the > > upgrade we can't say Yes or no. Its upto the user discretion when they > > choose to upgrade and ofcourse if there are any critical fixes that > > might go into the release. We shouldn't be restricting their upgrade > > path just because we removed Java 7 support. > > > > My point is that both paths have their pros and cons and we need to weigh > them up. If some users are slow to upgrade the Java version (Java 7 has > been EOL'd for over a year), there's a good chance that they are slow to > upgrade Kafka too. And if that is the case (and it may not be), then > holding up improvements for the ones who actually do upgrade may be the > wrong call. To be clear, I am still in listening mode and I haven't made > up > my mind on the subject. > > Once we released 0.9.0 there aren't any 0.8.x releases. i.e we don't > > have LTS type release where we continually ship critical fixes over > > 0.8.x minor releases. So if a user notices a critical fix the only > > option today is to upgrade to next version where that fix is shipped. > > > > We haven't done a great job at this in the past, but there is no decision > that once a new major release is out, we don't do patch releases for the > previous major release. In fact, we have been collecting critical fixes > in > the 0.9.0 branch for a potential 0.9.0.2. > > I understand there is no decision made yet but given the premise was to > > ship this in 0.10.x , possibly 0.10.1 which I don't agree with. In > > general against shipping this in 0.10.x version. Removing Java 7 support > > when the release is minor in general not a good idea to users. > > > > Sorry if I didn't communicate this properly. I simply meant the next > feature release. I used 0.10.1.0 as an example, but it could also be > 0.11.0.0 if that turns out to be the next release. A discussion on that > will probably take place once the scope is clear. Personally, I think the > timing is more important the the version number, but it seems like some > people disagree. > > Ismael
Re: Kafka Connect HdfsSink and the Schema Registry
Great, glad you sorted it out. If the namespace is being omitted incorrectly from the request the connector is making, please file a bug report -- I can't think of a reason we'd omit that, but it's certainly possible it is a bug on our side. -Ewen On Wed, Jun 15, 2016 at 7:08 AM, Tauzell, Dave wrote: > Thanks Ewan, > > The second request was made by me directly. I'm trying to add this > functionality into my .Net application. The library I'm using doesn't have > an implementation of the AvroSeriazlizer that interacts with the schema > registry. I've now added in code to make to POST to > /subjects/-value with the schema. Something I noticed is that I > was using schema like this: > > { > "subject": "AuditHdfsTest5-value", > "version": 1, > "id": 5, > "schema": > "{\"type\":\"record\",\"name\":\"GenericAuditRecord\",\"namespace\":\"audit\",\"fields\":[{\"name\":\"xml\",\"type\":[\"string\",\"null\"]}]}" > } > > When the connector got a message and did a lookup it didn't have the > "namespace" field and the lookup failed. I then posted a new version of > the schema without the "namespace" field and it worked. > > -Dave > > Dave Tauzell | Senior Software Engineer | Surescripts > O: 651.855.3042 | www.surescripts.com | dave.tauz...@surescripts.com > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > -Original Message- > From: Ewen Cheslack-Postava [mailto:e...@confluent.io] > Sent: Tuesday, June 14, 2016 6:59 PM > To: users@kafka.apache.org > Subject: Re: Kafka Connect HdfsSink and the Schema Registry > > On Tue, Jun 14, 2016 at 8:08 AM, Tauzell, Dave < > dave.tauz...@surescripts.com > > wrote: > > > I have been able to get my C# client to put avro records to a Kafka > > topic and have the HdfsSink read and save them in files. I am > > confused about interaction with the registry. The kafka message > > contains a schema id an I see the connector look that up in the > > registry. Then it also looks up a subject which is -value. > > > > What is the relationship between the passed schema id and the subject > > which is derived from the topic name? > > > > The HDFS connector doesn't work directly with the schema registry, the > AvroConverter does. I'm not sure what the second request you're seeing is > -- normally it would only look up the schema ID in order to get the schema. > Where are you seeing the second request, and can you include some logs? I > can't think of any other requests the AvroConverter would be making just > for deserialization. > > The subject names are generating in the serializer as -key and > -value and this is just the standardized approach Confluent's > serializers use. The ID will have been registered under that subject. > > -Ewen > > > > > > -Dave > > > > This e-mail and any files transmitted with it are confidential, may > > contain sensitive information, and are intended solely for the use of > > the individual or entity to whom they are addressed. If you have > > received this e-mail in error, please notify the sender by reply > > e-mail immediately and destroy all copies of the e-mail and any > attachments. > > > > > > -- > Thanks, > Ewen > This e-mail and any files transmitted with it are confidential, may > contain sensitive information, and are intended solely for the use of the > individual or entity to whom they are addressed. If you have received this > e-mail in error, please notify the sender by reply e-mail immediately and > destroy all copies of the e-mail and any attachments. > -- Thanks, Ewen
Re: General Question About Kafka
The most common use case for Kafka is within a data center, but you can absolutely produce data across the WAN. You may need to adjust some settings (e.g. timeouts, max in flight requests per connection if you want high throughput) to account for operating over the WAN, but you can definitely do it. -Ewen On Wed, Jun 15, 2016 at 12:02 AM, ali wrote: > Hello Guys. > > > > We are going to install Apache Kafka in our local data center and different > > producers which are distributed across different locations will be > connected > to this server. > > Our Producers will use Internet connection and also will send 10mg data > packages every 30 seconds. > > I was wondering is actually Apache Kafka suite for my scenario ? Since we > will use Internet connection > > internet , should I be worried about network related problems such as > performance and latency ? > > > > Thank you > > Ali > > -- Thanks, Ewen
Re: Consumer Question
Hi Chris, We should also ensure that auto.create.topics.enable is set to true. Thank you, Anirudh Hi Chris, If the topic not exist, it will create a new topic with the name which you give. Thanks, Nicole On Sat, Jun 18, 2016 at 1:55 AM, Chris Barlock wrote: > If you have a consumer listening on a topic and that topic is deleted is > the consumer made aware -- perhaps by some exception -- or does it > continue listening, blissfully unaware that it will never hear anything > more? > > Thanks, > > Chris > > >
Re: Consumer Question
Hi Chris, If the topic not exist, it will create a new topic with the name which you give. Thanks, Nicole On Sat, Jun 18, 2016 at 1:55 AM, Chris Barlock wrote: > If you have a consumer listening on a topic and that topic is deleted is > the consumer made aware -- perhaps by some exception -- or does it > continue listening, blissfully unaware that it will never hear anything > more? > > Thanks, > > Chris > > >
Re: Wordcount with reduce
Hi Matthias, I solved the problem with specifying the serders and reading source as KStream instead of KTable. So, instead of KTable source = builder.table("topic1"); I added: KStream source = builder.stream(longSerde,stringSerde,"topic1"); Thanks -Adrienne On Sun, Jun 19, 2016 at 4:11 PM, Matthias J. Sax wrote: > Can you show the full stack trace? > > How do you ingest the date into the topic? I also think, you should read > the topic as KStream (instead of KTable). > > What de-/serializer do you specify in props. (see > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-serdes > ) > > > -Matthias > > On 06/19/2016 03:06 PM, Adrienne Kole wrote: > > Hi, > > > > I want to implement wordcount example with reduce function in KTable. > > However, I get the error: > > > > Exception in thread "StreamThread-1" > > org.apache.kafka.common.errors.SerializationException: Size of data > > received by LongDeserializer is not 8 > > > > > > Here is my code: > > > > > > KTable source = builder.table("topic1"); // here we > > have WordID and Word itself > > > > KTable counts = source.reduce(new Reducer() { > > > > @Override > > public Long apply(Long value1, Long value2) { > > // TODO Auto-generated method stub > > return value1+value2; > > } > > }, > > > > new Reducer() { > > > > @Override > > public Long apply(Long value1, Long value2) { > > // TODO Auto-generated method stub > > return value1-value2; > > } > > } > > > > , new KeyValueMapper>() { > > > > @Override > > public KeyValue apply(Long key, String value) { > > // TODO Auto-generated method stub > > return new KeyValue(value, new Long(1)); > > } > > }, stringSerde, longSerde, "count"); > > > > counts.to(Serdes.String(), Serdes.Long(), "topic2"); > > > > KafkaStreams streams = new KafkaStreams(builder, props); > > streams.start(); > > > > > > Moreover, I think the error messages should be more informative to better > > deal with such situations. > > > > > > > > - Adrienne > > > >
Re: Can I access Kafka Streams Key/Value store outside of Processor?
Hi Yi, Your observation about accessing the state stores that are already there vs. keeping state outside of Kafka Streams is a good one. We are currently working on having the state stores accessible like you mention and should be able to share some design docs shortly. Thanks Eno > On 19 Jun 2016, at 19:49, Yi Chen wrote: > > Hello, > > I am thinking of using the Kafka Steams feature to "unify" our real-time > and scheduled workflow. An example is that in our workflow with stages A--> > B --> C, the A --> B segment can be achieved in real-time, but B-->C > segment is usually a done with a scheduled job, running maybe once per hour > or once per 5 minutes, etc. > > I am hoping to model this using Kafka Streams. Each stage would be a topic: > the Kafka Streams will process real-time events in topic-A and send result > to topic-B. The challenge is when I process the events in topic-B, I want > to be able to process each event with a crontab-like schedule, so that if > the process is successful (by checking an external API) the event is send > to topic-C, otherwise, we will re-process the event again according to the > schedule. > > Can I use the RocksDB key/value state store to store the topic-B events > that failed to process, and have a scheduler (like quartz scheduler) to > iterate all events in the store and re-process again? I know I can always > keep the state outside of Kafka but I like that the state store is > fault-tolerant and can be rebuilt automatically if the instance fails. The > examples I found so far seems to imply that the state store is only > accessible from within a processor. > > Thanks, > Yi
Can I access Kafka Streams Key/Value store outside of Processor?
Hello, I am thinking of using the Kafka Steams feature to "unify" our real-time and scheduled workflow. An example is that in our workflow with stages A--> B --> C, the A --> B segment can be achieved in real-time, but B-->C segment is usually a done with a scheduled job, running maybe once per hour or once per 5 minutes, etc. I am hoping to model this using Kafka Streams. Each stage would be a topic: the Kafka Streams will process real-time events in topic-A and send result to topic-B. The challenge is when I process the events in topic-B, I want to be able to process each event with a crontab-like schedule, so that if the process is successful (by checking an external API) the event is send to topic-C, otherwise, we will re-process the event again according to the schedule. Can I use the RocksDB key/value state store to store the topic-B events that failed to process, and have a scheduler (like quartz scheduler) to iterate all events in the store and re-process again? I know I can always keep the state outside of Kafka but I like that the state store is fault-tolerant and can be rebuilt automatically if the instance fails. The examples I found so far seems to imply that the state store is only accessible from within a processor. Thanks, Yi
kafka + logstash
Hello all I use kafka input and kafka output plugin in logstash. I have high cpu usage, what can I do to get it better? Thanks a lot
kafka + logstash
Hello all I use kafka input and kafka output plugin in logstash. I have high cpu usage, what can I do to get it better? logstash version 2.3.2 logstash-input-kafka 2.0.8 logstash-output-kafka 2.0.5 Thanks a lot
Re: Error closing Socet for ...
Looks like the producers lose the connection to the brokers. Do the brokers have enough resources to handle all the producers? Does the network support that throughput? On Sun, 19 Jun 2016, 17:27 Avi Asulin, wrote: > Hi > We are using kafka 0.8.2 with scala 2.10 version > We currently have 3 brokers and we are working with ~ 170 producers > We frequently get the Error > > ERROR Closing socket for /170.144.181.50 because of error > (kafka.network.Processor) > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:197) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384) > at kafka.utils.Utils$.read(Utils.scala:380) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Processor.read(SocketServer.scala:444) > at kafka.network.Processor.run(SocketServer.scala:340) > at java.lang.Thread.run(Thread.java:745) > > we get the error on many producers ips > Can somone explain what can cause this error and what can be done to get > rid of it? > > Thanks > Avi >
Error closing Socet for ...
Hi We are using kafka 0.8.2 with scala 2.10 version We currently have 3 brokers and we are working with ~ 170 producers We frequently get the Error ERROR Closing socket for /170.144.181.50 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) we get the error on many producers ips Can somone explain what can cause this error and what can be done to get rid of it? Thanks Avi
Re: Wordcount with reduce
Can you show the full stack trace? How do you ingest the date into the topic? I also think, you should read the topic as KStream (instead of KTable). What de-/serializer do you specify in props. (see http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-serdes) -Matthias On 06/19/2016 03:06 PM, Adrienne Kole wrote: > Hi, > > I want to implement wordcount example with reduce function in KTable. > However, I get the error: > > Exception in thread "StreamThread-1" > org.apache.kafka.common.errors.SerializationException: Size of data > received by LongDeserializer is not 8 > > > Here is my code: > > > KTable source = builder.table("topic1"); // here we > have WordID and Word itself > > KTable counts = source.reduce(new Reducer() { > > @Override > public Long apply(Long value1, Long value2) { > // TODO Auto-generated method stub > return value1+value2; > } > }, > > new Reducer() { > > @Override > public Long apply(Long value1, Long value2) { > // TODO Auto-generated method stub > return value1-value2; > } > } > > , new KeyValueMapper>() { > > @Override > public KeyValue apply(Long key, String value) { > // TODO Auto-generated method stub > return new KeyValue(value, new Long(1)); > } > }, stringSerde, longSerde, "count"); > > counts.to(Serdes.String(), Serdes.Long(), "topic2"); > > KafkaStreams streams = new KafkaStreams(builder, props); > streams.start(); > > > Moreover, I think the error messages should be more informative to better > deal with such situations. > > > > - Adrienne > signature.asc Description: OpenPGP digital signature
Wordcount with reduce
Hi, I want to implement wordcount example with reduce function in KTable. However, I get the error: Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 Here is my code: KTable source = builder.table("topic1"); // here we have WordID and Word itself KTable counts = source.reduce(new Reducer() { @Override public Long apply(Long value1, Long value2) { // TODO Auto-generated method stub return value1+value2; } }, new Reducer() { @Override public Long apply(Long value1, Long value2) { // TODO Auto-generated method stub return value1-value2; } } , new KeyValueMapper>() { @Override public KeyValue apply(Long key, String value) { // TODO Auto-generated method stub return new KeyValue(value, new Long(1)); } }, stringSerde, longSerde, "count"); counts.to(Serdes.String(), Serdes.Long(), "topic2"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); Moreover, I think the error messages should be more informative to better deal with such situations. - Adrienne
Re: test of producer's delay and consumer's delay
@jun Rao about this question,can you give me some suggestion? > 在 2016年6月18日,上午11:26,Kafka 写道: > > hello,I have done a series of tests on kafka 0.9.0,and one of the results > confused me. > > test enviroment: > kafka cluster: 3 brokers,8core cpu / 8g mem /1g netcard > client:4core cpu/4g mem > topic:6 partitions,2 replica > > total messages:1 > singal message size:1024byte > fetch.min.bytes:1 > fetch.wait.max.ms:100ms > > all send tests are under the enviroment of using scala sync interface, > > when I set ack to 0,the producer’s delay is 0.3ms,the consumer’s delay is > 7.7ms > when I set ack to 1,the producer's delay is 1.6ms, the consumer’s delay is > 3.7ms > when I set ack to -1,the produce's delay is 3.5ms, the consumer’s delay is > 4.2ms > > but why consumer’s delay is decreased when I set ack from 0 to 1,its confused > me。 >