Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api

2016-06-19 Thread Ewen Cheslack-Postava
Rohit,

The 30s number sounds very suspicious because it is exactly the value of
the session timeout. But if you are driving the consumer correctly, you
shouldn't normally hit this timeout. Dana was asking about consumers
leaving gracefully because that is one case where you can inadvertently
trigger the 30s timeout, require *all* group members to wait that long
before they decide one of the previous members has ungracefully left the
group and they move on without it.

It sounds like something you are doing is causing the group to wait for the
session timeout. Is it possible any of your processes are exiting without
calling consumer.close()? Or that any of your processes are not calling
consumer.poll() within the session timeout of 30s? This can sometimes
happen if they receive too much data and take too long to process it (0.10
introduced max.poll.records to help users control this, and we're making
further refinements to the consumer to provide better application control
over number of messages fetched vs total processing time).

-Ewen

On Sun, Jun 19, 2016 at 10:01 PM, Rohit Sardesai  wrote:

>
> Can anybody help out on this?
> 
> From: Rohit Sardesai
> Sent: 19 June 2016 11:47:01
> To: users@kafka.apache.org
> Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer
> api
>
>
> In my tests , I am using around 24 consumer groups.  I never call
> consumer.close() or consumer.unsubscribe() until the application is
> shutting down.
>
> So the consumers never leave but new consumer instances do get created as
> the parallel requests pile up . Also, I am reusing consumer instances
>
> if they are idle ( i,.e not serving any consume request). So with 9
> partitions , I do 9 parallel consume requests in parallel every second
> under the same consumer group.
>
> So to summarize I have the following test setup : 3 Kafka brokers , 2
> zookeeper nodes,  1 topic , 9 partitions , 24 consumer groups and 9 consume
> requests at a time.
>
>
> 
> From: Dana Powers 
> Sent: 19 June 2016 10:45
> To: users@kafka.apache.org
> Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer
> api
>
> Is your test reusing a group name? And if so, are your consumer instances
> gracefully leaving? This may cause subsequent 'rebalance' operations to
> block until those old consumers check-in or the session timeout happens
> (30secs)
>
> -Dana
> On Jun 18, 2016 8:56 PM, "Rohit Sardesai" 
> wrote:
>
> > I am using the group management feature of Kafka 0.9 to handle partition
> > assignment to consumer instances. I use the subscribe() API to subscribe
> to
> > the topic I am interested in reading data from.  I have an environment
> > where I have 3 Kafka brokers  with a couple of Zookeeper nodes . I
> created
> > a topic with 9 partitions . The performance tests attempt to send 9
> > parallel poll() requests to the Kafka brokers every second. The results
> > show that each poll() operation takes around 30 seconds for the first
> time
> > it polls and returns 0 records. Also , when I print the partition
> > assignment to this consumer instance , I see no partitions assigned to
> it.
> > The next poll() does return quickly ( ~ 10-20 ms) with data and some
> > partitions assigned to it.
> >
> > With each consumer taking 30 seconds , the performance tests report very
> > low throughput since I run the tests for around 1000 seconds out which I
> > produce messages on the topic for the complete duration and I start the
> > parallel consume requests after 400 seconds. So out of 400 seconds ,
> with 9
> > consumers taking 30 seconds each , around 270 seconds are spent in the
> > first poll without any data. Is this because of the re-balance operation
> > that the consumers are blocked on the poll() ? What is the best way to
> use
> > poll()  if I have to serve many parallel requests per second ?  Should I
> > prefer manual assignment of partitions in this case instead of relying on
> > re-balance ?
> >
> >
> > Regards,
> >
> > Rohit Sardesai
> >
> >
>



-- 
Thanks,
Ewen


Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api

2016-06-19 Thread Rohit Sardesai

Can anybody help out on this?

From: Rohit Sardesai
Sent: 19 June 2016 11:47:01
To: users@kafka.apache.org
Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api


In my tests , I am using around 24 consumer groups.  I never call 
consumer.close() or consumer.unsubscribe() until the application is shutting 
down.

So the consumers never leave but new consumer instances do get created as the 
parallel requests pile up . Also, I am reusing consumer instances

if they are idle ( i,.e not serving any consume request). So with 9 partitions 
, I do 9 parallel consume requests in parallel every second under the same 
consumer group.

So to summarize I have the following test setup : 3 Kafka brokers , 2 zookeeper 
nodes,  1 topic , 9 partitions , 24 consumer groups and 9 consume requests at a 
time.



From: Dana Powers 
Sent: 19 June 2016 10:45
To: users@kafka.apache.org
Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api

Is your test reusing a group name? And if so, are your consumer instances
gracefully leaving? This may cause subsequent 'rebalance' operations to
block until those old consumers check-in or the session timeout happens
(30secs)

-Dana
On Jun 18, 2016 8:56 PM, "Rohit Sardesai" 
wrote:

> I am using the group management feature of Kafka 0.9 to handle partition
> assignment to consumer instances. I use the subscribe() API to subscribe to
> the topic I am interested in reading data from.  I have an environment
> where I have 3 Kafka brokers  with a couple of Zookeeper nodes . I created
> a topic with 9 partitions . The performance tests attempt to send 9
> parallel poll() requests to the Kafka brokers every second. The results
> show that each poll() operation takes around 30 seconds for the first time
> it polls and returns 0 records. Also , when I print the partition
> assignment to this consumer instance , I see no partitions assigned to it.
> The next poll() does return quickly ( ~ 10-20 ms) with data and some
> partitions assigned to it.
>
> With each consumer taking 30 seconds , the performance tests report very
> low throughput since I run the tests for around 1000 seconds out which I
> produce messages on the topic for the complete duration and I start the
> parallel consume requests after 400 seconds. So out of 400 seconds , with 9
> consumers taking 30 seconds each , around 270 seconds are spent in the
> first poll without any data. Is this because of the re-balance operation
> that the consumers are blocked on the poll() ? What is the best way to use
> poll()  if I have to serve many parallel requests per second ?  Should I
> prefer manual assignment of partitions in this case instead of relying on
> re-balance ?
>
>
> Regards,
>
> Rohit Sardesai
>
>


Re: Fail fast producer/consumer when no connection to Kafka brokers cluster

2016-06-19 Thread Ewen Cheslack-Postava
You can adjust request.timeout.ms, which is shared between both new
producer and new consumer. I don't think its quite what you want, but
probably the closest that exists across both clients. There's not much more
than that -- when you say "when the connection to the entire broker cluster
is lost" that's not really something that we'd expect to happen; you might
lose connectivity to some brokers, but even determining that the connection
"to the entire broker cluster" is lost is not something you can easily
determine (and often times requires long timeouts to determine anyway).

-Ewen

On Fri, Jun 17, 2016 at 3:27 AM, Spico Florin  wrote:

> Hello!
>   I would like to know what are the configurations/properties for the
> producer/consumer in order fail fast when the connection to the entire
> broker cluster is lost.
> For example if we can set up a parameter that when the connection trial
> reached a treshold then disconnect and throw an exception.
>
> I look forward for your answers.
> Regards,
> Florin
>



-- 
Thanks,
Ewen


Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-19 Thread Harsha
Hi Ismael,
  Agree on timing is more important. If we give enough heads
  up to the users who are on Java 7 thats great but still
  shipping this in 0.10.x line is won't be good as it still
  perceived as maint release even the release might contain
  lot of features .  If we can make this as part of 0.11 and
  cutting 0.10.1 features moving to 0.11 and giving rough
  timeline when that would be released would be ideal.

Thanks,
Harsha

On Fri, Jun 17, 2016, at 11:13 AM, Ismael Juma wrote:
> Hi Harsha,
> 
> Comments below.
> 
> On Fri, Jun 17, 2016 at 7:48 PM, Harsha  wrote:
> 
> > Hi Ismael,
> > "Are you saying that you are aware of many Kafka users still
> > using Java 7
> > > who would be ready to upgrade to the next Kafka feature release (whatever
> > > that version number is) before they can upgrade to Java 8?"
> > I know there quite few users who are still on java 7
> 
> 
> This is good to know.
> 
> 
> > and regarding the
> > upgrade we can't say Yes or no.  Its upto the user discretion when they
> > choose to upgrade and ofcourse if there are any critical fixes that
> > might go into the release.  We shouldn't be restricting their upgrade
> > path just because we removed Java 7 support.
> >
> 
> My point is that both paths have their pros and cons and we need to weigh
> them up. If some users are slow to upgrade the Java version (Java 7 has
> been EOL'd for over a year), there's a good chance that they are slow to
> upgrade Kafka too. And if that is the case (and it may not be), then
> holding up improvements for the ones who actually do upgrade may be the
> wrong call. To be clear, I am still in listening mode and I haven't made
> up
> my mind on the subject.
> 
> Once we released 0.9.0 there aren't any 0.8.x releases. i.e we don't
> > have LTS type release where we continually ship critical fixes over
> > 0.8.x minor releases. So if a user notices a critical fix the only
> > option today is to upgrade to next version where that fix is shipped.
> >
> 
> We haven't done a great job at this in the past, but there is no decision
> that once a new major release is out, we don't do patch releases for the
> previous major release. In fact, we have been collecting critical fixes
> in
> the 0.9.0 branch for a potential 0.9.0.2.
> 
> I understand there is no decision made yet but given the premise was to
> > ship this in 0.10.x  , possibly 0.10.1 which I don't agree with. In
> > general against shipping this in 0.10.x version. Removing Java 7 support
> > when the release is minor in general not a good idea to users.
> >
> 
> Sorry if I didn't communicate this properly. I simply meant the next
> feature release. I used 0.10.1.0 as an example, but it could also be
> 0.11.0.0 if that turns out to be the next release. A discussion on that
> will probably take place once the scope is clear. Personally, I think the
> timing is more important the the version number, but it seems like some
> people disagree.
> 
> Ismael


Re: Kafka Connect HdfsSink and the Schema Registry

2016-06-19 Thread Ewen Cheslack-Postava
Great, glad you sorted it out. If the namespace is being omitted
incorrectly from the request the connector is making, please file a bug
report -- I can't think of a reason we'd omit that, but it's certainly
possible it is a bug on our side.

-Ewen

On Wed, Jun 15, 2016 at 7:08 AM, Tauzell, Dave  wrote:

> Thanks Ewan,
>
> The second request was made by me directly.  I'm trying to add this
> functionality into my .Net application.  The library I'm using doesn't have
> an implementation of the AvroSeriazlizer that interacts with the schema
> registry.  I've now added in code to make to POST to
> /subjects/-value with the schema.   Something I noticed is that I
> was using schema like this:
>
> {
>   "subject": "AuditHdfsTest5-value",
>   "version": 1,
>   "id": 5,
>   "schema":
> "{\"type\":\"record\",\"name\":\"GenericAuditRecord\",\"namespace\":\"audit\",\"fields\":[{\"name\":\"xml\",\"type\":[\"string\",\"null\"]}]}"
> }
>
> When the connector got a message and did a lookup it didn't have the
> "namespace" field and the lookup failed.  I then posted a new version of
> the schema without the "namespace" field and it worked.
>
> -Dave
>
> Dave Tauzell | Senior Software Engineer | Surescripts
> O: 651.855.3042 | www.surescripts.com |   dave.tauz...@surescripts.com
> Connect with us: Twitter I LinkedIn I Facebook I YouTube
>
>
> -Original Message-
> From: Ewen Cheslack-Postava [mailto:e...@confluent.io]
> Sent: Tuesday, June 14, 2016 6:59 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Connect HdfsSink and the Schema Registry
>
> On Tue, Jun 14, 2016 at 8:08 AM, Tauzell, Dave <
> dave.tauz...@surescripts.com
> > wrote:
>
> > I have been able to get my C# client to put avro records to a Kafka
> > topic and have the HdfsSink read and save them in files.  I am
> > confused about interaction with the registry.  The kafka message
> > contains a schema id an I see the connector look that up in the
> > registry.  Then it also looks up a subject which is -value.
> >
> > What is the relationship between the passed schema id and the subject
> > which is derived from the topic name?
> >
>
> The HDFS connector doesn't work directly with the schema registry, the
> AvroConverter does. I'm not sure what the second request you're seeing is
> -- normally it would only look up the schema ID in order to get the schema.
> Where are you seeing the second request, and can you include some logs? I
> can't think of any other requests the AvroConverter would be making just
> for deserialization.
>
> The subject names are generating in the serializer as -key and
> -value and this is just the standardized approach Confluent's
> serializers use. The ID will have been registered under that subject.
>
> -Ewen
>
>
> >
> > -Dave
> >
> > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use of
> > the individual or entity to whom they are addressed. If you have
> > received this e-mail in error, please notify the sender by reply
> > e-mail immediately and destroy all copies of the e-mail and any
> attachments.
> >
>
>
>
> --
> Thanks,
> Ewen
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>



-- 
Thanks,
Ewen


Re: General Question About Kafka

2016-06-19 Thread Ewen Cheslack-Postava
The most common use case for Kafka is within a data center, but you can
absolutely produce data across the WAN. You may need to adjust some
settings (e.g. timeouts, max in flight requests per connection if you want
high throughput) to account for operating over the WAN, but you can
definitely do it.

-Ewen

On Wed, Jun 15, 2016 at 12:02 AM, ali  wrote:

> Hello Guys.
>
>
>
> We are going to install Apache Kafka in our local data center and different
>
> producers which are distributed across different locations will be
> connected
> to this server.
>
> Our Producers will use Internet connection and also will send 10mg data
> packages every 30 seconds.
>
> I was wondering is actually Apache Kafka suite for my scenario ? Since we
> will use Internet connection
>
> internet , should I be worried about network related problems such as
> performance and latency ?
>
>
>
> Thank you
>
> Ali
>
>


-- 
Thanks,
Ewen


Re: Consumer Question

2016-06-19 Thread Anirudh P
Hi Chris,

We should also ensure that auto.create.topics.enable is set to true.

Thank you,
Anirudh
Hi Chris,

If the topic not exist, it will create a new topic with the name which you
give.

Thanks,
Nicole

On Sat, Jun 18, 2016 at 1:55 AM, Chris Barlock  wrote:

> If you have a consumer listening on a topic and that topic is deleted is
> the consumer made aware -- perhaps by some exception -- or does it
> continue listening, blissfully unaware that it will never hear anything
> more?
>
> Thanks,
>
> Chris
>
>
>


Re: Consumer Question

2016-06-19 Thread Shaolu Xu
Hi Chris,

If the topic not exist, it will create a new topic with the name which you
give.

Thanks,
Nicole

On Sat, Jun 18, 2016 at 1:55 AM, Chris Barlock  wrote:

> If you have a consumer listening on a topic and that topic is deleted is
> the consumer made aware -- perhaps by some exception -- or does it
> continue listening, blissfully unaware that it will never hear anything
> more?
>
> Thanks,
>
> Chris
>
>
>


Re: Wordcount with reduce

2016-06-19 Thread Adrienne Kole
Hi Matthias,

I solved the problem with specifying the serders and reading source as
KStream instead of KTable. So, instead of

KTable source = builder.table("topic1");

I added:

KStream source =
builder.stream(longSerde,stringSerde,"topic1");

Thanks

-Adrienne












On Sun, Jun 19, 2016 at 4:11 PM, Matthias J. Sax 
wrote:

> Can you show the full stack trace?
>
> How do you ingest the date into the topic? I also think, you should read
> the topic as KStream (instead of KTable).
>
> What de-/serializer do you specify in props. (see
>
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-serdes
> )
>
>
> -Matthias
>
> On 06/19/2016 03:06 PM, Adrienne Kole wrote:
> > Hi,
> >
> > I want to implement wordcount example with reduce function in KTable.
> > However, I get the error:
> >
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.common.errors.SerializationException: Size of data
> > received by LongDeserializer is not 8
> >
> >
> > Here is my code:
> >
> >
> > KTable source = builder.table("topic1"); // here we
> > have WordID and Word itself
> >
> > KTable counts = source.reduce(new Reducer() {
> >
> > @Override
> > public Long apply(Long value1, Long value2) {
> > // TODO Auto-generated method stub
> > return value1+value2;
> > }
> > },
> >
> > new Reducer() {
> >
> > @Override
> > public Long apply(Long value1, Long value2) {
> > // TODO Auto-generated method stub
> > return value1-value2;
> > }
> > }
> >
> > , new KeyValueMapper>() {
> >
> > @Override
> > public KeyValue apply(Long key, String value) {
> > // TODO Auto-generated method stub
> > return new KeyValue(value, new Long(1));
> > }
> > }, stringSerde, longSerde, "count");
> >
> > counts.to(Serdes.String(), Serdes.Long(), "topic2");
> >
> > KafkaStreams streams = new KafkaStreams(builder, props);
> > streams.start();
> >
> >
> > Moreover, I think the error messages should be more informative to better
> > deal with such situations.
> >
> >
> >
> > - Adrienne
> >
>
>


Re: Can I access Kafka Streams Key/Value store outside of Processor?

2016-06-19 Thread Eno Thereska
Hi Yi,

Your observation about accessing the state stores that are already there vs. 
keeping state outside of Kafka Streams is a good one. We are currently working 
on having the state stores accessible like you mention and should be able to 
share some design docs shortly.

Thanks
Eno

> On 19 Jun 2016, at 19:49, Yi Chen  wrote:
> 
> Hello,
> 
> I am thinking of using the Kafka Steams feature to "unify" our real-time
> and scheduled workflow. An example is that in our workflow with stages A-->
> B --> C, the A --> B segment can be achieved in real-time, but B-->C
> segment is usually a done with a scheduled job, running maybe once per hour
> or once per 5 minutes, etc.
> 
> I am hoping to model this using Kafka Streams. Each stage would be a topic:
> the Kafka Streams will process real-time events in topic-A and send result
> to topic-B. The challenge is when I process the events in topic-B, I want
> to be able to process each event with a crontab-like schedule, so that if
> the process is successful (by checking an external API) the event is send
> to topic-C, otherwise, we will re-process the event again according to the
> schedule.
> 
> Can I use the RocksDB key/value state store to store the topic-B events
> that failed to process, and have a scheduler (like quartz scheduler) to
> iterate all events in the store and re-process again? I know I can always
> keep the state outside of Kafka but I like that the state store is
> fault-tolerant and can be rebuilt automatically if the instance fails. The
> examples I found so far seems to imply that the state store is only
> accessible from within a processor.
> 
> Thanks,
> Yi



Can I access Kafka Streams Key/Value store outside of Processor?

2016-06-19 Thread Yi Chen
Hello,

I am thinking of using the Kafka Steams feature to "unify" our real-time
and scheduled workflow. An example is that in our workflow with stages A-->
B --> C, the A --> B segment can be achieved in real-time, but B-->C
segment is usually a done with a scheduled job, running maybe once per hour
or once per 5 minutes, etc.

I am hoping to model this using Kafka Streams. Each stage would be a topic:
the Kafka Streams will process real-time events in topic-A and send result
to topic-B. The challenge is when I process the events in topic-B, I want
to be able to process each event with a crontab-like schedule, so that if
the process is successful (by checking an external API) the event is send
to topic-C, otherwise, we will re-process the event again according to the
schedule.

Can I use the RocksDB key/value state store to store the topic-B events
that failed to process, and have a scheduler (like quartz scheduler) to
iterate all events in the store and re-process again? I know I can always
keep the state outside of Kafka but I like that the state store is
fault-tolerant and can be rebuilt automatically if the instance fails. The
examples I found so far seems to imply that the state store is only
accessible from within a processor.

Thanks,
Yi


kafka + logstash

2016-06-19 Thread Fahimeh Ashrafy
Hello all

I use kafka input and kafka output plugin in logstash. I have high cpu
usage, what can I do to get it better?

Thanks a lot


kafka + logstash

2016-06-19 Thread Fahimeh Ashrafy
Hello all

I use kafka input and kafka output plugin in logstash. I have high cpu
usage, what can I do to get it better?
logstash version 2.3.2
logstash-input-kafka 2.0.8
logstash-output-kafka 2.0.5

Thanks a lot


Re: Error closing Socet for ...

2016-06-19 Thread OGrandeDiEnne
Looks like the producers lose the connection to the brokers.

Do the brokers have enough resources to handle all the producers? Does the
network support that throughput?

On Sun, 19 Jun 2016, 17:27 Avi Asulin,  wrote:

> Hi
> We are using kafka 0.8.2 with scala 2.10 version
> We currently have 3 brokers and we are working with ~ 170 producers
> We frequently get the Error
>
> ERROR Closing socket for /170.144.181.50 because of error
> (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
> at kafka.utils.Utils$.read(Utils.scala:380)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:444)
> at kafka.network.Processor.run(SocketServer.scala:340)
> at java.lang.Thread.run(Thread.java:745)
>
> we get the error on many producers ips
> Can somone explain what can cause this error and what can be done to get
> rid of it?
>
> Thanks
> Avi
>


Error closing Socet for ...

2016-06-19 Thread Avi Asulin
Hi
We are using kafka 0.8.2 with scala 2.10 version
We currently have 3 brokers and we are working with ~ 170 producers
We frequently get the Error

ERROR Closing socket for /170.144.181.50 because of error
(kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
at kafka.utils.Utils$.read(Utils.scala:380)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)

we get the error on many producers ips
Can somone explain what can cause this error and what can be done to get
rid of it?

Thanks
Avi


Re: Wordcount with reduce

2016-06-19 Thread Matthias J. Sax
Can you show the full stack trace?

How do you ingest the date into the topic? I also think, you should read
the topic as KStream (instead of KTable).

What de-/serializer do you specify in props. (see
http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-serdes)


-Matthias

On 06/19/2016 03:06 PM, Adrienne Kole wrote:
> Hi,
> 
> I want to implement wordcount example with reduce function in KTable.
> However, I get the error:
> 
> Exception in thread "StreamThread-1"
> org.apache.kafka.common.errors.SerializationException: Size of data
> received by LongDeserializer is not 8
> 
> 
> Here is my code:
> 
> 
> KTable source = builder.table("topic1"); // here we
> have WordID and Word itself
> 
> KTable counts = source.reduce(new Reducer() {
> 
> @Override
> public Long apply(Long value1, Long value2) {
> // TODO Auto-generated method stub
> return value1+value2;
> }
> },
> 
> new Reducer() {
> 
> @Override
> public Long apply(Long value1, Long value2) {
> // TODO Auto-generated method stub
> return value1-value2;
> }
> }
> 
> , new KeyValueMapper>() {
> 
> @Override
> public KeyValue apply(Long key, String value) {
> // TODO Auto-generated method stub
> return new KeyValue(value, new Long(1));
> }
> }, stringSerde, longSerde, "count");
> 
> counts.to(Serdes.String(), Serdes.Long(), "topic2");
> 
> KafkaStreams streams = new KafkaStreams(builder, props);
> streams.start();
> 
> 
> Moreover, I think the error messages should be more informative to better
> deal with such situations.
> 
> 
> 
> - Adrienne
> 



signature.asc
Description: OpenPGP digital signature


Wordcount with reduce

2016-06-19 Thread Adrienne Kole
Hi,

I want to implement wordcount example with reduce function in KTable.
However, I get the error:

Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException: Size of data
received by LongDeserializer is not 8


Here is my code:


KTable source = builder.table("topic1"); // here we
have WordID and Word itself

KTable counts = source.reduce(new Reducer() {

@Override
public Long apply(Long value1, Long value2) {
// TODO Auto-generated method stub
return value1+value2;
}
},

new Reducer() {

@Override
public Long apply(Long value1, Long value2) {
// TODO Auto-generated method stub
return value1-value2;
}
}

, new KeyValueMapper>() {

@Override
public KeyValue apply(Long key, String value) {
// TODO Auto-generated method stub
return new KeyValue(value, new Long(1));
}
}, stringSerde, longSerde, "count");

counts.to(Serdes.String(), Serdes.Long(), "topic2");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();


Moreover, I think the error messages should be more informative to better
deal with such situations.



- Adrienne


Re: test of producer's delay and consumer's delay

2016-06-19 Thread Kafka
@jun Rao

about this question,can you give me some suggestion?

> 在 2016年6月18日,上午11:26,Kafka  写道:
> 
> hello,I have done a series of tests on kafka 0.9.0,and one of the results 
> confused me.
> 
> test enviroment:
> kafka cluster: 3 brokers,8core cpu / 8g mem /1g netcard
> client:4core cpu/4g mem
> topic:6 partitions,2 replica
> 
> total messages:1
> singal message size:1024byte
> fetch.min.bytes:1
> fetch.wait.max.ms:100ms
> 
> all send tests are under the enviroment of using scala sync interface,
> 
> when I set ack to 0,the producer’s delay is 0.3ms,the consumer’s delay is 
> 7.7ms
> when I set ack to 1,the producer's delay is 1.6ms, the consumer’s delay is 
> 3.7ms
> when I set ack to -1,the produce's delay is 3.5ms, the consumer’s delay is 
> 4.2ms
> 
> but why consumer’s delay is decreased when I set ack from 0 to 1,its confused 
> me。
> 




Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api

2016-06-19 Thread Rohit Sardesai
In my tests , I am using around 24 consumer groups.  I never call 
consumer.close() or consumer.unsubscribe() until the application is shutting 
down.

So the consumers never leave but new consumer instances do get created as the 
parallel requests pile up . Also, I am reusing consumer instances

if they are idle ( i,.e not serving any consume request). So with 9 partitions 
, I do 9 parallel consume requests in parallel every second under the same 
consumer group.

So to summarize I have the following test setup : 3 Kafka brokers , 2 zookeeper 
nodes,  1 topic , 9 partitions , 24 consumer groups and 9 consume requests at a 
time.



From: Dana Powers 
Sent: 19 June 2016 10:45
To: users@kafka.apache.org
Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api

Is your test reusing a group name? And if so, are your consumer instances
gracefully leaving? This may cause subsequent 'rebalance' operations to
block until those old consumers check-in or the session timeout happens
(30secs)

-Dana
On Jun 18, 2016 8:56 PM, "Rohit Sardesai" 
wrote:

> I am using the group management feature of Kafka 0.9 to handle partition
> assignment to consumer instances. I use the subscribe() API to subscribe to
> the topic I am interested in reading data from.  I have an environment
> where I have 3 Kafka brokers  with a couple of Zookeeper nodes . I created
> a topic with 9 partitions . The performance tests attempt to send 9
> parallel poll() requests to the Kafka brokers every second. The results
> show that each poll() operation takes around 30 seconds for the first time
> it polls and returns 0 records. Also , when I print the partition
> assignment to this consumer instance , I see no partitions assigned to it.
> The next poll() does return quickly ( ~ 10-20 ms) with data and some
> partitions assigned to it.
>
> With each consumer taking 30 seconds , the performance tests report very
> low throughput since I run the tests for around 1000 seconds out which I
> produce messages on the topic for the complete duration and I start the
> parallel consume requests after 400 seconds. So out of 400 seconds , with 9
> consumers taking 30 seconds each , around 270 seconds are spent in the
> first poll without any data. Is this because of the re-balance operation
> that the consumers are blocked on the poll() ? What is the best way to use
> poll()  if I have to serve many parallel requests per second ?  Should I
> prefer manual assignment of partitions in this case instead of relying on
> re-balance ?
>
>
> Regards,
>
> Rohit Sardesai
>
>