Re: How to run the three producers test

2015-07-14 Thread Yuheng Du
Jiefu,

Now even if the disk space is enough (less than 18%), when I run

it still gives me error where in the logs it says:

[2015-07-14 23:08:48,735] FATAL Fatal error during KafkaServerStartable
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)

org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to
zookeeper server within timeout: 6000

at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880)

at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)

at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)

at kafka.server.KafkaServer.initZk(KafkaServer.scala:157)

at kafka.server.KafkaServer.startup(KafkaServer.scala:82)

at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:29)

at kafka.Kafka$.main(Kafka.scala:46)

at kafka.Kafka.main(Kafka.scala)

[2015-07-14 23:08:48,737] INFO [Kafka Server 1], shutting down
(kafka.server.KafkaServer)

I have checked that the zookeeper is running fine. Can anyone help why I
got the error? Thanks.

On Tue, Jul 14, 2015 at 10:24 PM, Yuheng Du 
wrote:

> But is there a way to let kafka override the old data if the disk is
> filled? Or is it not necessary to use this figure? Thanks.
>
> On Tue, Jul 14, 2015 at 10:14 PM, Yuheng Du 
> wrote:
>
>> Jiefu,
>>
>> I agree with you. I checked the hardware specs of my machines, each one
>> of them has:
>>
>> RAM
>>
>>
>>
>> 256GB ECC Memory (16x 16 GB DDR4 1600MT/s dual rank RDIMMs
>>
>> Disk
>>
>>
>>
>> Two 1 TB 7.2K RPM 3G SATA HDDs
>>
>> For the throughput versus stored data test, it uses 5*10^10 messages,
>> which has the total volume of 5TB, I made the replication factor to be 3,
>> which means the total size including replicas would be 15TB, which
>> apparently overwhelmed the two brokers I use.
>>
>> Thanks.
>>
>> best,
>> Yuheng
>>
>> On Tue, Jul 14, 2015 at 6:06 PM, JIEFU GONG  wrote:
>>
>>> Someone may correct me if I am incorrect, but how much disk space do you
>>> have on these nodes? Your exception 'No space left on device' from one of
>>> your brokers seems to suggest that you're full (after all you're writing
>>> 500 million records). If this is the case I believe the expected behavior
>>> for Kafka is to reject any more attempts to write data?
>>>
>>> On Tue, Jul 14, 2015 at 2:27 PM, Yuheng Du 
>>> wrote:
>>>
>>> > Also, the log in another broker (not the bootstrap) says:
>>> >
>>> > [2015-07-14 15:18:41,220] FATAL [Replica Manager on Broker 1]: Error
>>> > writing to highwatermark file:  (kafka.server.ReplicaManager)
>>> >
>>> >
>>> > [2015-07-14 15:18:40,160] ERROR Closing socket for /130.127.133.47
>>> because
>>> > of error (kafka.network.Process
>>> >
>>> > or)
>>> >
>>> > java.io.IOException: Connection reset by peer
>>> >
>>> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>> >
>>> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>> >
>>> > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>> >
>>> > at sun.nio.ch.IOUtil.read(IOUtil.java:197)
>>> >
>>> > at
>>> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>> >
>>> > at kafka.utils.Utils$.read(Utils.scala:380)
>>> >
>>> > at
>>> >
>>> >
>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>> >
>>> > at kafka.network.Processor.read(SocketServer.scala:444)
>>> >
>>> > at kafka.network.Processor.run(SocketServer.scala:340)
>>> >
>>> > at java.lang.Thread.run(Thread.java:745)
>>> >
>>> > 
>>> >
>>> > java.io.IOException: No space left on device
>>> >
>>> > at java.io.FileOutputStream.writeBytes(Native Method)
>>> >
>>> > at java.io.FileOutputStream.write(FileOutputStream.java:345)
>>> >
>>> > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>>> >
>>> > at sun.nio.cs.StreamEncoder.implFlushBuffe
>>> >
>>> > (END)
>>> >
>>> > On Tue, Jul 14, 2015 at 5:24 PM, Yuheng Du 
>>> > wrote:
>>> >
>>> > > Hi Jiefu, Gwen,
>>> > >
>>> > > I am running the Throughput versus stored data test:
>>> > > bin/kafka-run-class.sh
>>> org.apache.kafka.clients.tools.ProducerPerformance
>>> > > test 500 100 -1 acks=1 bootstrap.servers=
>>> > > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
>>> > batch.size=8196
>>> > >
>>> > > After around 50,000,000 messages were sent, I got a bunch of
>>> connection
>>> > > refused error as I mentioned before. I checked the logs on the
>>> broker and
>>> > > here is what I see:
>>> > >
>>> > > [2015-07-14 15:11:23,578] WARN Partition [test,4] on broker 5: No
>>> > > checkpointed highwatermark is found for partition [test,4]
>>> > > (kafka.cluster.Partition)
>>> > >
>>> > > [2015-07-14 15:12:33,298] INFO Rolled new log segment for 'test-4'
>>> in 4
>>> > > ms. (kafka.log.Log)
>>> > >
>>> > > [2015-07-14 15:12:33,299] INFO Rolled new log segment for 'test-0'
>>> in 1
>>> > > ms. (kafka.log.Log)
>>> > >
>>> > > [20

Re: hidden producer memory usage ? does partition-filebacked-batched-async kafka producer make sense?

2015-07-14 Thread tao xiao
The OOME issue may be caused
by org.apache.kafka.clients.producer.internals.ErrorLoggingCallback holding
unnecessary byte[] value. Can you apply the patch in below JIRA and try
again?

https://issues.apache.org/jira/browse/KAFKA-2281


On Wed, 15 Jul 2015 at 06:42 francesco vigotti 
wrote:

> I've not explained why only 10 partitions, anyway this is due to the fact
> that this does not speedup producer and also having this memory-monitoring
> problem and because I have no problems on the consumers side at the moment
> (10 should be enough even if I've not fully tested it yet ) and because the
> solution would be this partitions-separated-queues 10 seemed a fair number
> for me...
> also I'm considering the setup of a kind of producer-proxy instances,
> something that can be pushed with a batch of messages from a producer that
> I see that have communication problems to a leader and send this batch for
> him, this could help to mitigate the producer latency during such
> networking problems.. there isn't something like this yet right? an option
> in the producer that try to use a non-leader broker as broxy for the
> partition-leader broker would be optimal :) maybe with
> producer-to-singlebroker latency stats :)
>
> Thanks again and sorry for the verbosity :)
>
> On Tue, Jul 14, 2015 at 10:14 PM, francesco vigotti <
> vigotti.france...@gmail.com> wrote:
>
> > Hi,
> > I'm playing with kafka new producer to see if it could fit my use case,
> > kafka version 8.2.1
> > I'll probably end up having a kafka cluster of 5 nodes on multiple
> > datacenter
> > with one topic, with a replication factor of 2, and at least 10
> partitions
> > required for consumer performance , ( I'll explain why only 10 later.. )
> > my producers are 10 or more also distributed on multiple datacenters
> > around the globe each producing about 2k messages/sec
> > message size is about 500 bytes*message
> >
> > during my tests I have seen that when there is a network issue between
> one
> > of the producer and one of the partition leaders, the producer start to
> > accumulate data much more than the configured buffer.memory (configured
> at
> > about 40mb, which crash the jvm because Garbage collector cannot make
> space
> > to the application to work ( jvm memory is about 2 gb, working at about
> 1gb
> > usage always except when this problem occurr)
> > what i see is that i have about 500mb used by char[],
> >
> > during this events, yes, the buffer usage increase from about 5 mb to
> > about 30mb, and batches sends reach their maximum size to help to speedup
> > the transmission but seems to be not enought for memory usage /
> performance
> > / resilience requirements,
> >
> >
> >
> > kafka.producer.acks=1
> > kafka.producer.buffer.memory=4000
> > kafka.producer.compression.type=gzip
> > kafka.producer.retries=1
> > kafka.producer.batch.size=1
> > kafka.producer.max.request.size=1000
> > kafka.producer.send.buffer.bytes=100
> > kafka.producer.timeout.ms=1
> > kafka.producer.reconnect.backoff.ms=500
> > kafka.producer.retry.backoff.ms=500
> > kafka.producer.block.on.buffer.full=false
> > kafka.producer.linger.ms=1000
> > kafka.producer.maxinflight=2
> > kafka.producer.serializer.class=kafka.serializer.DefaultEncoder
> >
> > the main issue anyway is that when 1 producer get connections path issues
> > to 1 broker (packet loss/connection slowdown) ( which doesn't mean that
> the
> > broker is down , other producer could reach it , it's just a networking
> > problem between the two which coul last just 1 hours during peak time in
> > some routing paths) it crash the whole application due to jvm ram usage
> >
> > I have found no solution playing with available params,
> >
> > now reading around I have understood that description of internal produer
> > work:
> > producer get messages and return an async future ( but during first
> > connection it blocks before returing the future due to metadata fetching
> ,
> > but that's another strange story :) ) , then the message get queued in 1
> > queue per partition, then in a random order, but sequentially brokers are
> > contacted and all queue for the partitions assigned to that brokers are
> > sent, waiting the end of transmission before proceeding to the next
> broker
> >  but if the transmission to 1 broker hangs /slow down it does everything
> (
> > other queues grows ) and I don't understood where the buffer memory will
> be
> > used in this whole process, because to me seems that it there is
> something
> > else that is using memory when this happens
> >
> > *to get more finer grained control over memory usage of kafka producer :*
> > a simple file-backed solutions that monitor the callbacks to monitor when
> > the producer hangs and stop sending to kafka producer more data when I
> know
> > that there are more than N messages around could be a partial solution,
> ie
> > could solve the memory usage but it's strange because for that the buffer
> > should be enought... is

Re: kafka benchmark tests

2015-07-14 Thread Ewen Cheslack-Postava
@Jiefu, yes! The patch is functional, I think it's just waiting on a bit of
final review after the last round of changes. You can definitely use it for
your own benchmarking, and we'd love to see patches for any additional
tests we missed in the first pass!

-Ewen

On Tue, Jul 14, 2015 at 10:53 AM, JIEFU GONG  wrote:

> Yuheng,
> I would recommend looking here:
> http://kafka.apache.org/documentation.html#brokerconfigs and scrolling
> down
> to get a better understanding of the default settings and what they mean --
> it'll tell you what different options for acks does.
>
> Ewen,
> Thank you immensely for your thoughts, they shed a lot of insight into the
> issue. Though it is understandable that your specific results need to be
> verified, it seems that the KIP-25 patch is functional and I can use it for
> my own benchmarking purposes? Is that correct? Thanks again!
>
> On Tue, Jul 14, 2015 at 8:22 AM, Yuheng Du 
> wrote:
>
> > Also, I guess setting the target throughput to -1 means let it be as high
> > as possible?
> >
> > On Tue, Jul 14, 2015 at 10:36 AM, Yuheng Du 
> > wrote:
> >
> > > Thanks. If I set the acks=1 in the producer config options in
> > > bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance
> > > test7 5000 100 -1 acks=1 bootstrap.servers=
> > > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> > batch.size=8196?
> > >
> > > Does that mean for each message generated at the producer, the producer
> > > will wait until the broker sends the ack back, then send another
> message?
> > >
> > > Thanks.
> > >
> > > Yuheng
> > >
> > > On Tue, Jul 14, 2015 at 10:06 AM, Manikumar Reddy <
> ku...@nmsworks.co.in>
> > > wrote:
> > >
> > >> Yes, A list of  Kafka Server host/port pairs to use for establishing
> the
> > >> initial connection to the Kafka cluster
> > >>
> > >> https://kafka.apache.org/documentation.html#newproducerconfigs
> > >>
> > >> On Tue, Jul 14, 2015 at 7:29 PM, Yuheng Du 
> > >> wrote:
> > >>
> > >> > Does anyone know what is bootstrap.servers=
> > >> > esv4-hcl198.grid.linkedin.com:9092 means in the following test
> > command:
> > >> >
> > >> > bin/kafka-run-class.sh
> > >> org.apache.kafka.clients.tools.ProducerPerformance
> > >> > test7 5000 100 -1 acks=1 bootstrap.servers=
> > >> > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> > >> batch.size=8196?
> > >> >
> > >> > what is bootstrap.servers? Is it the kafka server that I am running
> a
> > >> test
> > >> > at?
> > >> >
> > >> > Thanks.
> > >> >
> > >> > Yuheng
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Tue, Jul 14, 2015 at 12:18 AM, Ewen Cheslack-Postava <
> > >> e...@confluent.io
> > >> > >
> > >> > wrote:
> > >> >
> > >> > > I implemented (nearly) the same basic set of tests in the system
> > test
> > >> > > framework we started at Confluent and that is going to move into
> > >> Kafka --
> > >> > > see the wip patch for KIP-25 here:
> > >> > https://github.com/apache/kafka/pull/70
> > >> > > In particular, that test is implemented in benchmark_test.py:
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8
> > >> > >
> > >> > > Hopefully once that's merged people can reuse that benchmark (and
> > add
> > >> to
> > >> > > it!) so they can easily run the same benchmarks across different
> > >> > hardware.
> > >> > > Here are some results from an older version of that test on
> > m3.2xlarge
> > >> > > instances on EC2 using local ephemeral storage (I think... it's
> been
> > >> > awhile
> > >> > > since I ran these numbers and I didn't document methodology that
> > >> > > carefully):
> > >> > >
> > >> > > INFO:_.KafkaBenchmark:=
> > >> > > INFO:_.KafkaBenchmark:BENCHMARK RESULTS
> > >> > > INFO:_.KafkaBenchmark:=
> > >> > > INFO:_.KafkaBenchmark:Single producer, no replication:
> 684097.470208
> > >> > > rec/sec (65.24 MB/s)
> > >> > > INFO:_.KafkaBenchmark:Single producer, async 3x replication:
> > >> > > 667494.359673 rec/sec (63.66 MB/s)
> > >> > > INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
> > >> > > 116485.764275 rec/sec (11.11 MB/s)
> > >> > > INFO:_.KafkaBenchmark:Three producers, async 3x replication:
> > >> > > 1696519.022182 rec/sec (161.79 MB/s)
> > >> > > INFO:_.KafkaBenchmark:Message size:
> > >> > > INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
> > >> > > INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
> > >> > > INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
> > >> > > INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
> > >> > > INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
> > >> > > INFO:_.KafkaBenchmark:Throughput over long run, data > memory:
> > >> > > INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec
> > (65.30
> > >> > MB/s)
> > >> > > INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec
> > >> 

Java API for fetching Consumer group from Kafka Server(Not Zookeeper)

2015-07-14 Thread Swati Suman
Hi Team,

Currently, I am able to fetch the Topic,Partition,Leader,Log Size through
TopicMetadataRequest API available in Kafka.

Is there any java api that gives me the consumer groups?

Best Regards,
Swati Suman


Re: How to run the three producers test

2015-07-14 Thread Yuheng Du
But is there a way to let kafka override the old data if the disk is
filled? Or is it not necessary to use this figure? Thanks.

On Tue, Jul 14, 2015 at 10:14 PM, Yuheng Du 
wrote:

> Jiefu,
>
> I agree with you. I checked the hardware specs of my machines, each one of
> them has:
>
> RAM
>
>
>
> 256GB ECC Memory (16x 16 GB DDR4 1600MT/s dual rank RDIMMs
>
> Disk
>
>
>
> Two 1 TB 7.2K RPM 3G SATA HDDs
>
> For the throughput versus stored data test, it uses 5*10^10 messages,
> which has the total volume of 5TB, I made the replication factor to be 3,
> which means the total size including replicas would be 15TB, which
> apparently overwhelmed the two brokers I use.
>
> Thanks.
>
> best,
> Yuheng
>
> On Tue, Jul 14, 2015 at 6:06 PM, JIEFU GONG  wrote:
>
>> Someone may correct me if I am incorrect, but how much disk space do you
>> have on these nodes? Your exception 'No space left on device' from one of
>> your brokers seems to suggest that you're full (after all you're writing
>> 500 million records). If this is the case I believe the expected behavior
>> for Kafka is to reject any more attempts to write data?
>>
>> On Tue, Jul 14, 2015 at 2:27 PM, Yuheng Du 
>> wrote:
>>
>> > Also, the log in another broker (not the bootstrap) says:
>> >
>> > [2015-07-14 15:18:41,220] FATAL [Replica Manager on Broker 1]: Error
>> > writing to highwatermark file:  (kafka.server.ReplicaManager)
>> >
>> >
>> > [2015-07-14 15:18:40,160] ERROR Closing socket for /130.127.133.47
>> because
>> > of error (kafka.network.Process
>> >
>> > or)
>> >
>> > java.io.IOException: Connection reset by peer
>> >
>> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> >
>> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> >
>> > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>> >
>> > at sun.nio.ch.IOUtil.read(IOUtil.java:197)
>> >
>> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>> >
>> > at kafka.utils.Utils$.read(Utils.scala:380)
>> >
>> > at
>> >
>> >
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>> >
>> > at kafka.network.Processor.read(SocketServer.scala:444)
>> >
>> > at kafka.network.Processor.run(SocketServer.scala:340)
>> >
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > 
>> >
>> > java.io.IOException: No space left on device
>> >
>> > at java.io.FileOutputStream.writeBytes(Native Method)
>> >
>> > at java.io.FileOutputStream.write(FileOutputStream.java:345)
>> >
>> > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>> >
>> > at sun.nio.cs.StreamEncoder.implFlushBuffe
>> >
>> > (END)
>> >
>> > On Tue, Jul 14, 2015 at 5:24 PM, Yuheng Du 
>> > wrote:
>> >
>> > > Hi Jiefu, Gwen,
>> > >
>> > > I am running the Throughput versus stored data test:
>> > > bin/kafka-run-class.sh
>> org.apache.kafka.clients.tools.ProducerPerformance
>> > > test 500 100 -1 acks=1 bootstrap.servers=
>> > > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
>> > batch.size=8196
>> > >
>> > > After around 50,000,000 messages were sent, I got a bunch of
>> connection
>> > > refused error as I mentioned before. I checked the logs on the broker
>> and
>> > > here is what I see:
>> > >
>> > > [2015-07-14 15:11:23,578] WARN Partition [test,4] on broker 5: No
>> > > checkpointed highwatermark is found for partition [test,4]
>> > > (kafka.cluster.Partition)
>> > >
>> > > [2015-07-14 15:12:33,298] INFO Rolled new log segment for 'test-4' in
>> 4
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:12:33,299] INFO Rolled new log segment for 'test-0' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:13:39,529] INFO Rolled new log segment for 'test-4' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:13:39,531] INFO Rolled new log segment for 'test-0' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-4' in
>> 3
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-0' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:15:51,478] INFO Rolled new log segment for 'test-4' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:15:51,479] INFO Rolled new log segment for 'test-0' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:16:52,589] INFO Rolled new log segment for 'test-4' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:16:52,590] INFO Rolled new log segment for 'test-0' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:17:57,406] INFO Rolled new log segment for 'test-4' in
>> 1
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:17:57,407] INFO Rolled new log segment for 'test-0' in
>> 0
>> > > ms. (kafka.log.Log)
>> > >
>> > > [2015-07-14 15:18:39,792] FATAL [KafkaApi-5] Halting due to
>> unrecoverable
>> > > I/O

Re: How to run the three producers test

2015-07-14 Thread Yuheng Du
Jiefu,

I agree with you. I checked the hardware specs of my machines, each one of
them has:

RAM



256GB ECC Memory (16x 16 GB DDR4 1600MT/s dual rank RDIMMs

Disk



Two 1 TB 7.2K RPM 3G SATA HDDs

For the throughput versus stored data test, it uses 5*10^10 messages, which
has the total volume of 5TB, I made the replication factor to be 3, which
means the total size including replicas would be 15TB, which apparently
overwhelmed the two brokers I use.

Thanks.

best,
Yuheng

On Tue, Jul 14, 2015 at 6:06 PM, JIEFU GONG  wrote:

> Someone may correct me if I am incorrect, but how much disk space do you
> have on these nodes? Your exception 'No space left on device' from one of
> your brokers seems to suggest that you're full (after all you're writing
> 500 million records). If this is the case I believe the expected behavior
> for Kafka is to reject any more attempts to write data?
>
> On Tue, Jul 14, 2015 at 2:27 PM, Yuheng Du 
> wrote:
>
> > Also, the log in another broker (not the bootstrap) says:
> >
> > [2015-07-14 15:18:41,220] FATAL [Replica Manager on Broker 1]: Error
> > writing to highwatermark file:  (kafka.server.ReplicaManager)
> >
> >
> > [2015-07-14 15:18:40,160] ERROR Closing socket for /130.127.133.47
> because
> > of error (kafka.network.Process
> >
> > or)
> >
> > java.io.IOException: Connection reset by peer
> >
> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> >
> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> >
> > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> >
> > at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> >
> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> >
> > at kafka.utils.Utils$.read(Utils.scala:380)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >
> > at kafka.network.Processor.read(SocketServer.scala:444)
> >
> > at kafka.network.Processor.run(SocketServer.scala:340)
> >
> > at java.lang.Thread.run(Thread.java:745)
> >
> > 
> >
> > java.io.IOException: No space left on device
> >
> > at java.io.FileOutputStream.writeBytes(Native Method)
> >
> > at java.io.FileOutputStream.write(FileOutputStream.java:345)
> >
> > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> >
> > at sun.nio.cs.StreamEncoder.implFlushBuffe
> >
> > (END)
> >
> > On Tue, Jul 14, 2015 at 5:24 PM, Yuheng Du 
> > wrote:
> >
> > > Hi Jiefu, Gwen,
> > >
> > > I am running the Throughput versus stored data test:
> > > bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance
> > > test 500 100 -1 acks=1 bootstrap.servers=
> > > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> > batch.size=8196
> > >
> > > After around 50,000,000 messages were sent, I got a bunch of connection
> > > refused error as I mentioned before. I checked the logs on the broker
> and
> > > here is what I see:
> > >
> > > [2015-07-14 15:11:23,578] WARN Partition [test,4] on broker 5: No
> > > checkpointed highwatermark is found for partition [test,4]
> > > (kafka.cluster.Partition)
> > >
> > > [2015-07-14 15:12:33,298] INFO Rolled new log segment for 'test-4' in 4
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:12:33,299] INFO Rolled new log segment for 'test-0' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:13:39,529] INFO Rolled new log segment for 'test-4' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:13:39,531] INFO Rolled new log segment for 'test-0' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-4' in 3
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-0' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:15:51,478] INFO Rolled new log segment for 'test-4' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:15:51,479] INFO Rolled new log segment for 'test-0' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:16:52,589] INFO Rolled new log segment for 'test-4' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:16:52,590] INFO Rolled new log segment for 'test-0' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:17:57,406] INFO Rolled new log segment for 'test-4' in 1
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:17:57,407] INFO Rolled new log segment for 'test-0' in 0
> > > ms. (kafka.log.Log)
> > >
> > > [2015-07-14 15:18:39,792] FATAL [KafkaApi-5] Halting due to
> unrecoverable
> > > I/O error while handling produce request:  (kafka.server.KafkaApis)
> > >
> > > kafka.common.KafkaStorageException: I/O exception in append to log
> > 'test-0'
> > >
> > > at kafka.log.Log.append(Log.scala:266)
> > >
> > > at
> > >
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
> > >
> > > at
> > >
> >
> kafka.cluster.

Re: Offset not committed

2015-07-14 Thread Jiangjie Qin
Hi Vadim, 

Can you turn on trace level logging on your consumer and search for
"offset commit response² in the log?
Also maybe take a look at the log to see if there is any exception thrown.

Thanks,

Jiangjie (Becket) Qin

On 7/14/15, 11:06 AM, "Vadim Bobrov"  wrote:

>just caught this error again. I issue commitOffsets - no error but no
>committng offsets either. __consumer_offsets watching shows no new
>messages
>either. Then in a few minutes I issue commitOffsets again - all committed.
>Unless I am doing something terribly wrong this is very unreliable
>
>On Tue, Jul 14, 2015 at 1:49 PM, Joel Koshy  wrote:
>
>> Actually, how are you committing offsets? Are you using the old
>> (zookeeperconsumerconnector) or new KafkaConsumer?
>>
>> It is true that the current APIs don't return any result, but it would
>> help to check if anything is getting into the offsets topic - unless
>> you are seeing errors in the logs, the offset commit should succeed
>> (if you are indeed explicitly committing offsets).
>>
>> Thanks,
>>
>> Joel
>>
>> On Tue, Jul 14, 2015 at 12:19:01PM -0400, Vadim Bobrov wrote:
>> > Thanks, Joel, I will but regardless of my findings the basic problem
>>will
>> > still be there: there is no guarantee that the offsets will be
>>committed
>> > after commitOffsets. Because commitOffsets does not return its exit
>> status,
>> > nor does it block as I understand until offsets are committed. In
>>other
>> > words, there is no way to know that it has, in fact, commited the
>>offsets
>> >
>> > or am I missing something? And then another question - why does it
>>seem
>> to
>> > depend on the number of consumed messages?
>> >
>> > On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy 
>> wrote:
>> >
>> > > Can you take a look at the kafka commit rate mbean on your consumer?
>> > > Also, can you consume the offsets topic while you are committing
>> > > offsets and see if/what offsets are getting committed?
>> > > (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
>> > >
>> > > Thanks,
>> > >
>> > > Joel
>> > >
>> > > On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
>> > > > I am trying to replace ActiveMQ with Kafka in our environment
>> however I
>> > > > have encountered a strange problem that basically prevents from
>>using
>> > > Kafka
>> > > > in production. The problem is that sometimes the offsets are not
>> > > committed.
>> > > >
>> > > > I am using Kafka 0.8.2.1, offset storage = kafka, high level
>> consumer,
>> > > > auto-commit = off. Every N messages I issue commitOffsets(). Now
>> here is
>> > > > the problem - if N is below a certain number (180 000 for me) it
>> works
>> > > and
>> > > > the offset is moving. If N is 180 000 or more the offset is not
>> updated
>> > > > after commitOffsets
>> > > >
>> > > > I am looking at offsets using kafka-run-class.sh
>> > > > kafka.tools.ConsumerOffsetChecker
>> > > > Any help?
>> > >
>> > >
>>
>>



Re: hidden producer memory usage ? does partition-filebacked-batched-async kafka producer make sense?

2015-07-14 Thread francesco vigotti
I've not explained why only 10 partitions, anyway this is due to the fact
that this does not speedup producer and also having this memory-monitoring
problem and because I have no problems on the consumers side at the moment
(10 should be enough even if I've not fully tested it yet ) and because the
solution would be this partitions-separated-queues 10 seemed a fair number
for me...
also I'm considering the setup of a kind of producer-proxy instances,
something that can be pushed with a batch of messages from a producer that
I see that have communication problems to a leader and send this batch for
him, this could help to mitigate the producer latency during such
networking problems.. there isn't something like this yet right? an option
in the producer that try to use a non-leader broker as broxy for the
partition-leader broker would be optimal :) maybe with
producer-to-singlebroker latency stats :)

Thanks again and sorry for the verbosity :)

On Tue, Jul 14, 2015 at 10:14 PM, francesco vigotti <
vigotti.france...@gmail.com> wrote:

> Hi,
> I'm playing with kafka new producer to see if it could fit my use case,
> kafka version 8.2.1
> I'll probably end up having a kafka cluster of 5 nodes on multiple
> datacenter
> with one topic, with a replication factor of 2, and at least 10 partitions
> required for consumer performance , ( I'll explain why only 10 later.. )
> my producers are 10 or more also distributed on multiple datacenters
> around the globe each producing about 2k messages/sec
> message size is about 500 bytes*message
>
> during my tests I have seen that when there is a network issue between one
> of the producer and one of the partition leaders, the producer start to
> accumulate data much more than the configured buffer.memory (configured at
> about 40mb, which crash the jvm because Garbage collector cannot make space
> to the application to work ( jvm memory is about 2 gb, working at about 1gb
> usage always except when this problem occurr)
> what i see is that i have about 500mb used by char[],
>
> during this events, yes, the buffer usage increase from about 5 mb to
> about 30mb, and batches sends reach their maximum size to help to speedup
> the transmission but seems to be not enought for memory usage / performance
> / resilience requirements,
>
>
>
> kafka.producer.acks=1
> kafka.producer.buffer.memory=4000
> kafka.producer.compression.type=gzip
> kafka.producer.retries=1
> kafka.producer.batch.size=1
> kafka.producer.max.request.size=1000
> kafka.producer.send.buffer.bytes=100
> kafka.producer.timeout.ms=1
> kafka.producer.reconnect.backoff.ms=500
> kafka.producer.retry.backoff.ms=500
> kafka.producer.block.on.buffer.full=false
> kafka.producer.linger.ms=1000
> kafka.producer.maxinflight=2
> kafka.producer.serializer.class=kafka.serializer.DefaultEncoder
>
> the main issue anyway is that when 1 producer get connections path issues
> to 1 broker (packet loss/connection slowdown) ( which doesn't mean that the
> broker is down , other producer could reach it , it's just a networking
> problem between the two which coul last just 1 hours during peak time in
> some routing paths) it crash the whole application due to jvm ram usage
>
> I have found no solution playing with available params,
>
> now reading around I have understood that description of internal produer
> work:
> producer get messages and return an async future ( but during first
> connection it blocks before returing the future due to metadata fetching ,
> but that's another strange story :) ) , then the message get queued in 1
> queue per partition, then in a random order, but sequentially brokers are
> contacted and all queue for the partitions assigned to that brokers are
> sent, waiting the end of transmission before proceeding to the next broker
>  but if the transmission to 1 broker hangs /slow down it does everything (
> other queues grows ) and I don't understood where the buffer memory will be
> used in this whole process, because to me seems that it there is something
> else that is using memory when this happens
>
> *to get more finer grained control over memory usage of kafka producer :*
> a simple file-backed solutions that monitor the callbacks to monitor when
> the producer hangs and stop sending to kafka producer more data when I know
> that there are more than N messages around could be a partial solution, ie
> could solve the memory usage but it's strange because for that the buffer
> should be enought... is the buffer internally multiplied by partitions or
> brokers ? or there is something else I'm not considering that could use so
> much ram (10x the buffer size) ?
>
> but also this solution will slow down the whole producer in transmissions
> to all partitions leaders,
>
> where N is the number of the partitions in  the queue,
> I'm going to build something like N threaded solution where each thread
> handle one queue for one partition ( handing 1 leader per thread would be
> optimal but 

hidden producer memory usage ? does partition-filebacked-batched-async kafka producer make sense?

2015-07-14 Thread francesco vigotti
Hi,
I'm playing with kafka new producer to see if it could fit my use case,
kafka version 8.2.1
I'll probably end up having a kafka cluster of 5 nodes on multiple
datacenter
with one topic, with a replication factor of 2, and at least 10 partitions
required for consumer performance , ( I'll explain why only 10 later.. )
my producers are 10 or more also distributed on multiple datacenters around
the globe each producing about 2k messages/sec
message size is about 500 bytes*message

during my tests I have seen that when there is a network issue between one
of the producer and one of the partition leaders, the producer start to
accumulate data much more than the configured buffer.memory (configured at
about 40mb, which crash the jvm because Garbage collector cannot make space
to the application to work ( jvm memory is about 2 gb, working at about 1gb
usage always except when this problem occurr)
what i see is that i have about 500mb used by char[],

during this events, yes, the buffer usage increase from about 5 mb to about
30mb, and batches sends reach their maximum size to help to speedup the
transmission but seems to be not enought for memory usage / performance /
resilience requirements,



kafka.producer.acks=1
kafka.producer.buffer.memory=4000
kafka.producer.compression.type=gzip
kafka.producer.retries=1
kafka.producer.batch.size=1
kafka.producer.max.request.size=1000
kafka.producer.send.buffer.bytes=100
kafka.producer.timeout.ms=1
kafka.producer.reconnect.backoff.ms=500
kafka.producer.retry.backoff.ms=500
kafka.producer.block.on.buffer.full=false
kafka.producer.linger.ms=1000
kafka.producer.maxinflight=2
kafka.producer.serializer.class=kafka.serializer.DefaultEncoder

the main issue anyway is that when 1 producer get connections path issues
to 1 broker (packet loss/connection slowdown) ( which doesn't mean that the
broker is down , other producer could reach it , it's just a networking
problem between the two which coul last just 1 hours during peak time in
some routing paths) it crash the whole application due to jvm ram usage

I have found no solution playing with available params,

now reading around I have understood that description of internal produer
work:
producer get messages and return an async future ( but during first
connection it blocks before returing the future due to metadata fetching ,
but that's another strange story :) ) , then the message get queued in 1
queue per partition, then in a random order, but sequentially brokers are
contacted and all queue for the partitions assigned to that brokers are
sent, waiting the end of transmission before proceeding to the next broker
 but if the transmission to 1 broker hangs /slow down it does everything (
other queues grows ) and I don't understood where the buffer memory will be
used in this whole process, because to me seems that it there is something
else that is using memory when this happens

*to get more finer grained control over memory usage of kafka producer :*
a simple file-backed solutions that monitor the callbacks to monitor when
the producer hangs and stop sending to kafka producer more data when I know
that there are more than N messages around could be a partial solution, ie
could solve the memory usage but it's strange because for that the buffer
should be enought... is the buffer internally multiplied by partitions or
brokers ? or there is something else I'm not considering that could use so
much ram (10x the buffer size) ?

but also this solution will slow down the whole producer in transmissions
to all partitions leaders,

where N is the number of the partitions in  the queue,
I'm going to build something like N threaded solution where each thread
handle one queue for one partition ( handing 1 leader per thread would be
optimal but to avoid the need to handling of leader assignment/reelection 1
queue per partition is easier ) each thread have one producer which
obviously will have less batch-performance improvements because
multiple-partition-batches doesn't get aggregated to the assigned leader
but at least will have more batching than the sync producer, and if the
callback-tracking count of in-transmission messages reach some threshold I
start using the local-disk-storage as persistence for message to not give
the producer too much messages, and using the disk-fifo as sources for the
producer til it's size reach 0 again... at the end each thread will have
his file backed queue when slow down , and will process everything on ram
queues when run OK, but each producer will never be overloaded of messages ,
having a more finer grained control on producer memory usage could avoid
the callback-inflying-message-counter but the disk queue is anyway required
to avoid message discards on buffer full, and multiple producer are
required to avoid slow down of 1 producer when the problem is only between
that producer and one partition leader...

does all this make sense ?

Thank you :),
Francesco Vigotti


Re: How to run the three producers test

2015-07-14 Thread JIEFU GONG
Someone may correct me if I am incorrect, but how much disk space do you
have on these nodes? Your exception 'No space left on device' from one of
your brokers seems to suggest that you're full (after all you're writing
500 million records). If this is the case I believe the expected behavior
for Kafka is to reject any more attempts to write data?

On Tue, Jul 14, 2015 at 2:27 PM, Yuheng Du  wrote:

> Also, the log in another broker (not the bootstrap) says:
>
> [2015-07-14 15:18:41,220] FATAL [Replica Manager on Broker 1]: Error
> writing to highwatermark file:  (kafka.server.ReplicaManager)
>
>
> [2015-07-14 15:18:40,160] ERROR Closing socket for /130.127.133.47 because
> of error (kafka.network.Process
>
> or)
>
> java.io.IOException: Connection reset by peer
>
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
>
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>
> at kafka.utils.Utils$.read(Utils.scala:380)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> at kafka.network.Processor.read(SocketServer.scala:444)
>
> at kafka.network.Processor.run(SocketServer.scala:340)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 
>
> java.io.IOException: No space left on device
>
> at java.io.FileOutputStream.writeBytes(Native Method)
>
> at java.io.FileOutputStream.write(FileOutputStream.java:345)
>
> at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>
> at sun.nio.cs.StreamEncoder.implFlushBuffe
>
> (END)
>
> On Tue, Jul 14, 2015 at 5:24 PM, Yuheng Du 
> wrote:
>
> > Hi Jiefu, Gwen,
> >
> > I am running the Throughput versus stored data test:
> > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> > test 500 100 -1 acks=1 bootstrap.servers=
> > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> batch.size=8196
> >
> > After around 50,000,000 messages were sent, I got a bunch of connection
> > refused error as I mentioned before. I checked the logs on the broker and
> > here is what I see:
> >
> > [2015-07-14 15:11:23,578] WARN Partition [test,4] on broker 5: No
> > checkpointed highwatermark is found for partition [test,4]
> > (kafka.cluster.Partition)
> >
> > [2015-07-14 15:12:33,298] INFO Rolled new log segment for 'test-4' in 4
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:12:33,299] INFO Rolled new log segment for 'test-0' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:13:39,529] INFO Rolled new log segment for 'test-4' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:13:39,531] INFO Rolled new log segment for 'test-0' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-4' in 3
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-0' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:15:51,478] INFO Rolled new log segment for 'test-4' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:15:51,479] INFO Rolled new log segment for 'test-0' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:16:52,589] INFO Rolled new log segment for 'test-4' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:16:52,590] INFO Rolled new log segment for 'test-0' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:17:57,406] INFO Rolled new log segment for 'test-4' in 1
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:17:57,407] INFO Rolled new log segment for 'test-0' in 0
> > ms. (kafka.log.Log)
> >
> > [2015-07-14 15:18:39,792] FATAL [KafkaApi-5] Halting due to unrecoverable
> > I/O error while handling produce request:  (kafka.server.KafkaApis)
> >
> > kafka.common.KafkaStorageException: I/O exception in append to log
> 'test-0'
> >
> > at kafka.log.Log.append(Log.scala:266)
> >
> > at
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
> >
> > at
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
> >
> > at kafka.utils.Utils$.inLock(Utils.scala:535)
> >
> > at kafka.utils.Utils$.inReadLock(Utils.scala:541)
> >
> > at
> > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
> >
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
> >
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
> >
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scal

Re: How to run the three producers test

2015-07-14 Thread Yuheng Du
Also, the log in another broker (not the bootstrap) says:

[2015-07-14 15:18:41,220] FATAL [Replica Manager on Broker 1]: Error
writing to highwatermark file:  (kafka.server.ReplicaManager)


[2015-07-14 15:18:40,160] ERROR Closing socket for /130.127.133.47 because
of error (kafka.network.Process

or)

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

at sun.nio.ch.IOUtil.read(IOUtil.java:197)

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)

at kafka.utils.Utils$.read(Utils.scala:380)

at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:444)

at kafka.network.Processor.run(SocketServer.scala:340)

at java.lang.Thread.run(Thread.java:745)



java.io.IOException: No space left on device

at java.io.FileOutputStream.writeBytes(Native Method)

at java.io.FileOutputStream.write(FileOutputStream.java:345)

at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)

at sun.nio.cs.StreamEncoder.implFlushBuffe

(END)

On Tue, Jul 14, 2015 at 5:24 PM, Yuheng Du  wrote:

> Hi Jiefu, Gwen,
>
> I am running the Throughput versus stored data test:
> bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> test 500 100 -1 acks=1 bootstrap.servers=
> esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196
>
> After around 50,000,000 messages were sent, I got a bunch of connection
> refused error as I mentioned before. I checked the logs on the broker and
> here is what I see:
>
> [2015-07-14 15:11:23,578] WARN Partition [test,4] on broker 5: No
> checkpointed highwatermark is found for partition [test,4]
> (kafka.cluster.Partition)
>
> [2015-07-14 15:12:33,298] INFO Rolled new log segment for 'test-4' in 4
> ms. (kafka.log.Log)
>
> [2015-07-14 15:12:33,299] INFO Rolled new log segment for 'test-0' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:13:39,529] INFO Rolled new log segment for 'test-4' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:13:39,531] INFO Rolled new log segment for 'test-0' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-4' in 3
> ms. (kafka.log.Log)
>
> [2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-0' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:15:51,478] INFO Rolled new log segment for 'test-4' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:15:51,479] INFO Rolled new log segment for 'test-0' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:16:52,589] INFO Rolled new log segment for 'test-4' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:16:52,590] INFO Rolled new log segment for 'test-0' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:17:57,406] INFO Rolled new log segment for 'test-4' in 1
> ms. (kafka.log.Log)
>
> [2015-07-14 15:17:57,407] INFO Rolled new log segment for 'test-0' in 0
> ms. (kafka.log.Log)
>
> [2015-07-14 15:18:39,792] FATAL [KafkaApi-5] Halting due to unrecoverable
> I/O error while handling produce request:  (kafka.server.KafkaApis)
>
> kafka.common.KafkaStorageException: I/O exception in append to log 'test-0'
>
> at kafka.log.Log.append(Log.scala:266)
>
> at
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
>
> at
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
>
> at kafka.utils.Utils$.inLock(Utils.scala:535)
>
> at kafka.utils.Utils$.inReadLock(Utils.scala:541)
>
> at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
>
> at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
>
> at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>
> at scala.coll
>
>
>
> Can you help me with this problem? Thanks.
>
> On Tue, Jul 14, 2015 at 5:12 PM, Yuheng Du 
> wrote:
>
>> I checked the logs on the brokers, it seems that the zookeeper or the
>> kafka server process is not running on this broker...Thank you guys. I will
>> see if it happens again.
>>
>> On Tue, Jul 14, 2015 at 4:53 PM, JIEFU GONG  wrote:
>>
>>> Hmm..yeah some error logs would be nice like Gwen pointed out. Do any of
>>> your brokers fall out of the ISR when sending messages? It seems like
>>> your
>>> setup should be fine, so I'm no

Re: How to run the three producers test

2015-07-14 Thread Yuheng Du
Hi Jiefu, Gwen,

I am running the Throughput versus stored data test:
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test 500 100 -1 acks=1 bootstrap.servers=
esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

After around 50,000,000 messages were sent, I got a bunch of connection
refused error as I mentioned before. I checked the logs on the broker and
here is what I see:

[2015-07-14 15:11:23,578] WARN Partition [test,4] on broker 5: No
checkpointed highwatermark is found for partition [test,4]
(kafka.cluster.Partition)

[2015-07-14 15:12:33,298] INFO Rolled new log segment for 'test-4' in 4 ms.
(kafka.log.Log)

[2015-07-14 15:12:33,299] INFO Rolled new log segment for 'test-0' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:13:39,529] INFO Rolled new log segment for 'test-4' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:13:39,531] INFO Rolled new log segment for 'test-0' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-4' in 3 ms.
(kafka.log.Log)

[2015-07-14 15:14:48,502] INFO Rolled new log segment for 'test-0' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:15:51,478] INFO Rolled new log segment for 'test-4' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:15:51,479] INFO Rolled new log segment for 'test-0' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:16:52,589] INFO Rolled new log segment for 'test-4' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:16:52,590] INFO Rolled new log segment for 'test-0' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:17:57,406] INFO Rolled new log segment for 'test-4' in 1 ms.
(kafka.log.Log)

[2015-07-14 15:17:57,407] INFO Rolled new log segment for 'test-0' in 0 ms.
(kafka.log.Log)

[2015-07-14 15:18:39,792] FATAL [KafkaApi-5] Halting due to unrecoverable
I/O error while handling produce request:  (kafka.server.KafkaApis)

kafka.common.KafkaStorageException: I/O exception in append to log 'test-0'

at kafka.log.Log.append(Log.scala:266)

at
kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)

at
kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)

at kafka.utils.Utils$.inLock(Utils.scala:535)

at kafka.utils.Utils$.inReadLock(Utils.scala:541)

at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)

at
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)

at
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at scala.coll



Can you help me with this problem? Thanks.

On Tue, Jul 14, 2015 at 5:12 PM, Yuheng Du  wrote:

> I checked the logs on the brokers, it seems that the zookeeper or the
> kafka server process is not running on this broker...Thank you guys. I will
> see if it happens again.
>
> On Tue, Jul 14, 2015 at 4:53 PM, JIEFU GONG  wrote:
>
>> Hmm..yeah some error logs would be nice like Gwen pointed out. Do any of
>> your brokers fall out of the ISR when sending messages? It seems like your
>> setup should be fine, so I'm not entirely sure.
>>
>> On Tue, Jul 14, 2015 at 1:31 PM, Yuheng Du 
>> wrote:
>>
>> > Jiefu,
>> >
>> > I am performing these tests on a 6 nodes cluster in cloudlab (a
>> > infrastructure built for scientific research). I use 2 nodes as
>> producers,
>> > 2 as brokers only, and 2 as consumers. I have tested for each individual
>> > machines and they work well. I did not use AWS. Thank you!
>> >
>> > On Tue, Jul 14, 2015 at 4:20 PM, JIEFU GONG  wrote:
>> >
>> > > Yuheng, are you performing these tests locally or using a service
>> such as
>> > > AWS? I'd try using each separate machine individually first,
>> connecting
>> > to
>> > > the ZK/Kafka servers and ensuring that each is able to first log and
>> > > consume messages independently.
>> > >
>> > > On Tue, Jul 14, 2015 at 1:17 PM, Gwen Shapira 
>> > > wrote:
>> > >
>> > > > Are there any errors on the broker logs?
>> > > >
>> > > > On Tue, Jul 14, 2015 at 11:57 AM, Yuheng Du <
>> yuheng.du.h...@gmail.com>
>> > > > wrote:
>> > > > > Jiefu,
>> > > > >
>> > > > > Thank you. The three producers can run at the same time. I mean
>> > should
>> > > > they
>> > > > > be started at exactly the same time? (I have three consoles from
>> each
>> > > of
>> > > > > the three machines and I just start the console command manually
>> one
>> > by
>> > > > > one) Or a small variation of the starting time won't matter?
>> > > > >
>> > > > > Gwen and Jiefu,
>> > > > >
>> > > > > I have started the three producers at three machines, after a
>> while,
>> > > all
>> > > > of
>> > > > > them gives a

Re: Latency test

2015-07-14 Thread Tao Feng
I think ProducerPerformance microbenchmark  only measure between client to
brokers(producer to brokers) and provide latency information.

On Tue, Jul 14, 2015 at 11:05 AM, Yuheng Du 
wrote:

> Currently, the latency test from kafka test the end to end latency between
> producers and consumers.
>
> Is there  a way to test the producer to broker  and broker to consumer
> delay seperately?
>
> Thanks.
>


Re: How to run the three producers test

2015-07-14 Thread Yuheng Du
I checked the logs on the brokers, it seems that the zookeeper or the kafka
server process is not running on this broker...Thank you guys. I will see
if it happens again.

On Tue, Jul 14, 2015 at 4:53 PM, JIEFU GONG  wrote:

> Hmm..yeah some error logs would be nice like Gwen pointed out. Do any of
> your brokers fall out of the ISR when sending messages? It seems like your
> setup should be fine, so I'm not entirely sure.
>
> On Tue, Jul 14, 2015 at 1:31 PM, Yuheng Du 
> wrote:
>
> > Jiefu,
> >
> > I am performing these tests on a 6 nodes cluster in cloudlab (a
> > infrastructure built for scientific research). I use 2 nodes as
> producers,
> > 2 as brokers only, and 2 as consumers. I have tested for each individual
> > machines and they work well. I did not use AWS. Thank you!
> >
> > On Tue, Jul 14, 2015 at 4:20 PM, JIEFU GONG  wrote:
> >
> > > Yuheng, are you performing these tests locally or using a service such
> as
> > > AWS? I'd try using each separate machine individually first, connecting
> > to
> > > the ZK/Kafka servers and ensuring that each is able to first log and
> > > consume messages independently.
> > >
> > > On Tue, Jul 14, 2015 at 1:17 PM, Gwen Shapira 
> > > wrote:
> > >
> > > > Are there any errors on the broker logs?
> > > >
> > > > On Tue, Jul 14, 2015 at 11:57 AM, Yuheng Du <
> yuheng.du.h...@gmail.com>
> > > > wrote:
> > > > > Jiefu,
> > > > >
> > > > > Thank you. The three producers can run at the same time. I mean
> > should
> > > > they
> > > > > be started at exactly the same time? (I have three consoles from
> each
> > > of
> > > > > the three machines and I just start the console command manually
> one
> > by
> > > > > one) Or a small variation of the starting time won't matter?
> > > > >
> > > > > Gwen and Jiefu,
> > > > >
> > > > > I have started the three producers at three machines, after a
> while,
> > > all
> > > > of
> > > > > them gives a java.net.ConnectException:
> > > > >
> > > > > [2015-07-14 12:56:46,352] WARN Error in I/O with producer0-link-0/
> > > > > 192.168.1.1 (org.apache.kafka.common.network.Selector)
> > > > >
> > > > > java.net.ConnectException: Connection refused..
> > > > >
> > > > > [2015-07-14 12:56:48,056] WARN Error in I/O with producer1-link-0/
> > > > > 192.168.1.2 (org.apache.kafka.common.network.Selector)
> > > > >
> > > > > java.net.ConnectException: Connection refused.
> > > > >
> > > > > What could be the cause?
> > > > >
> > > > > Thank you guys!
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jul 14, 2015 at 2:47 PM, JIEFU GONG 
> > > wrote:
> > > > >
> > > > >> Yuheng,
> > > > >>
> > > > >> Yes, if you read the blog post it specifies that he's using three
> > > > separate
> > > > >> machines. There's no reason the producers cannot be started at the
> > > same
> > > > >> time, I believe.
> > > > >>
> > > > >> On Tue, Jul 14, 2015 at 11:42 AM, Yuheng Du <
> > yuheng.du.h...@gmail.com
> > > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hi,
> > > > >> >
> > > > >> > I am running the performance test for kafka.
> > > > >> > https://gist.github.com/jkreps
> > > > >> > /c7ddb4041ef62a900e6c
> > > > >> >
> > > > >> > For the "Three Producers, 3x async replication" scenario, the
> > > command
> > > > is
> > > > >> > the same as one producer:
> > > > >> >
> > > > >> > bin/kafka-run-class.sh
> > > > org.apache.kafka.clients.tools.ProducerPerformance
> > > > >> > test 5000 100 -1 acks=1
> > > > >> > bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> > > > >> > buffer.memory=67108864 batch.size=8196
> > > > >> >
> > > > >> > So How to I run the test for three producers? Do I just run them
> > on
> > > > three
> > > > >> > separate servers at the same time? Will there be some error in
> > this
> > > > way
> > > > >> > since the three producers can't be started at the same time?
> > > > >> >
> > > > >> > Thanks.
> > > > >> >
> > > > >> > best,
> > > > >> > Yuheng
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >>
> > > > >> Jiefu Gong
> > > > >> University of California, Berkeley | Class of 2017
> > > > >> B.A Computer Science | College of Letters and Sciences
> > > > >>
> > > > >> jg...@berkeley.edu  | (925) 400-3427
> > > > >>
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > Jiefu Gong
> > > University of California, Berkeley | Class of 2017
> > > B.A Computer Science | College of Letters and Sciences
> > >
> > > jg...@berkeley.edu  | (925) 400-3427
> > >
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>


Re: How to run the three producers test

2015-07-14 Thread JIEFU GONG
Hmm..yeah some error logs would be nice like Gwen pointed out. Do any of
your brokers fall out of the ISR when sending messages? It seems like your
setup should be fine, so I'm not entirely sure.

On Tue, Jul 14, 2015 at 1:31 PM, Yuheng Du  wrote:

> Jiefu,
>
> I am performing these tests on a 6 nodes cluster in cloudlab (a
> infrastructure built for scientific research). I use 2 nodes as producers,
> 2 as brokers only, and 2 as consumers. I have tested for each individual
> machines and they work well. I did not use AWS. Thank you!
>
> On Tue, Jul 14, 2015 at 4:20 PM, JIEFU GONG  wrote:
>
> > Yuheng, are you performing these tests locally or using a service such as
> > AWS? I'd try using each separate machine individually first, connecting
> to
> > the ZK/Kafka servers and ensuring that each is able to first log and
> > consume messages independently.
> >
> > On Tue, Jul 14, 2015 at 1:17 PM, Gwen Shapira 
> > wrote:
> >
> > > Are there any errors on the broker logs?
> > >
> > > On Tue, Jul 14, 2015 at 11:57 AM, Yuheng Du 
> > > wrote:
> > > > Jiefu,
> > > >
> > > > Thank you. The three producers can run at the same time. I mean
> should
> > > they
> > > > be started at exactly the same time? (I have three consoles from each
> > of
> > > > the three machines and I just start the console command manually one
> by
> > > > one) Or a small variation of the starting time won't matter?
> > > >
> > > > Gwen and Jiefu,
> > > >
> > > > I have started the three producers at three machines, after a while,
> > all
> > > of
> > > > them gives a java.net.ConnectException:
> > > >
> > > > [2015-07-14 12:56:46,352] WARN Error in I/O with producer0-link-0/
> > > > 192.168.1.1 (org.apache.kafka.common.network.Selector)
> > > >
> > > > java.net.ConnectException: Connection refused..
> > > >
> > > > [2015-07-14 12:56:48,056] WARN Error in I/O with producer1-link-0/
> > > > 192.168.1.2 (org.apache.kafka.common.network.Selector)
> > > >
> > > > java.net.ConnectException: Connection refused.
> > > >
> > > > What could be the cause?
> > > >
> > > > Thank you guys!
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Jul 14, 2015 at 2:47 PM, JIEFU GONG 
> > wrote:
> > > >
> > > >> Yuheng,
> > > >>
> > > >> Yes, if you read the blog post it specifies that he's using three
> > > separate
> > > >> machines. There's no reason the producers cannot be started at the
> > same
> > > >> time, I believe.
> > > >>
> > > >> On Tue, Jul 14, 2015 at 11:42 AM, Yuheng Du <
> yuheng.du.h...@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >> > Hi,
> > > >> >
> > > >> > I am running the performance test for kafka.
> > > >> > https://gist.github.com/jkreps
> > > >> > /c7ddb4041ef62a900e6c
> > > >> >
> > > >> > For the "Three Producers, 3x async replication" scenario, the
> > command
> > > is
> > > >> > the same as one producer:
> > > >> >
> > > >> > bin/kafka-run-class.sh
> > > org.apache.kafka.clients.tools.ProducerPerformance
> > > >> > test 5000 100 -1 acks=1
> > > >> > bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> > > >> > buffer.memory=67108864 batch.size=8196
> > > >> >
> > > >> > So How to I run the test for three producers? Do I just run them
> on
> > > three
> > > >> > separate servers at the same time? Will there be some error in
> this
> > > way
> > > >> > since the three producers can't be started at the same time?
> > > >> >
> > > >> > Thanks.
> > > >> >
> > > >> > best,
> > > >> > Yuheng
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >>
> > > >> Jiefu Gong
> > > >> University of California, Berkeley | Class of 2017
> > > >> B.A Computer Science | College of Letters and Sciences
> > > >>
> > > >> jg...@berkeley.edu  | (925) 400-3427
> > > >>
> > >
> >
> >
> >
> > --
> >
> > Jiefu Gong
> > University of California, Berkeley | Class of 2017
> > B.A Computer Science | College of Letters and Sciences
> >
> > jg...@berkeley.edu  | (925) 400-3427
> >
>



-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu  | (925) 400-3427


Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread Todd Palino
It could be a client error, but we're seeing it show up in Mirror Maker.

-Todd


On Tue, Jul 14, 2015 at 1:27 PM, JIEFU GONG  wrote:

> Got it, looks like I didn't understand the request process and am failing
> to use AB properly. Thanks for the help everyone! I suspect you might be
> running into a similar error, David.
>
> On Tue, Jul 14, 2015 at 11:56 AM, Jay Kreps  wrote:
>
> > This is almost certainly a client bug. Kafka's request format is size
> > delimited messages in the form
> ><4 byte size N>
> > If the client sends a request with an invalid size or sends a partial
> > request the server will see effectively random bytes from the next
> request
> > as the size of the next message and generally reject the request (or fail
> > to parse it).
> >
> > -Jay
> >
> > On Sat, Jul 11, 2015 at 9:08 PM, David Montgomery <
> > davidmontgom...@gmail.com
> > > wrote:
> >
> > > I cant send this s simple payload using python.
> > >
> > > topic: topic-test-development
> > > payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
> > >
> > >
> > > No handlers could be found for logger "kafka.conn"
> > > Traceback (most recent call last):
> > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> > 81,
> > > in 
> > > test_send_data_to_realtimenode()
> > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> > 38,
> > > in test_send_data_to_realtimenode
> > > response = producer.send_messages(test_topic,test_payload)
> > >   File
> "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
> > > line 54, in send_messages
> > > topic, partition, *msg
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > line 349, in send_messages
> > > return self._send_messages(topic, partition, *msg)
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > line 390, in _send_messages
> > > fail_on_error=self.sync_fail_on_error
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > 480,
> > > in send_produce_request
> > > (not fail_on_error or not self._raise_on_response_error(resp))]
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > 247,
> > > in _raise_on_response_error
> > > raise resp
> > > kafka.common.FailedPayloadsError
> > >
> > > Here is what is in my logs
> > > [2015-07-12 03:29:58,103] INFO Closing socket connection to
> > > /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497
> is
> > > not valid, it is larger than the maximum size of 104857600 bytes.
> > > (kafka.network.Processor)
> > >
> > >
> > >
> > > Server is 4 gigs of ram.
> > >
> > > I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in
> kafka-server-start.sh
> > >
> > > So.why?
> > >
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>


Re: How to run the three producers test

2015-07-14 Thread Yuheng Du
Jiefu,

I am performing these tests on a 6 nodes cluster in cloudlab (a
infrastructure built for scientific research). I use 2 nodes as producers,
2 as brokers only, and 2 as consumers. I have tested for each individual
machines and they work well. I did not use AWS. Thank you!

On Tue, Jul 14, 2015 at 4:20 PM, JIEFU GONG  wrote:

> Yuheng, are you performing these tests locally or using a service such as
> AWS? I'd try using each separate machine individually first, connecting to
> the ZK/Kafka servers and ensuring that each is able to first log and
> consume messages independently.
>
> On Tue, Jul 14, 2015 at 1:17 PM, Gwen Shapira 
> wrote:
>
> > Are there any errors on the broker logs?
> >
> > On Tue, Jul 14, 2015 at 11:57 AM, Yuheng Du 
> > wrote:
> > > Jiefu,
> > >
> > > Thank you. The three producers can run at the same time. I mean should
> > they
> > > be started at exactly the same time? (I have three consoles from each
> of
> > > the three machines and I just start the console command manually one by
> > > one) Or a small variation of the starting time won't matter?
> > >
> > > Gwen and Jiefu,
> > >
> > > I have started the three producers at three machines, after a while,
> all
> > of
> > > them gives a java.net.ConnectException:
> > >
> > > [2015-07-14 12:56:46,352] WARN Error in I/O with producer0-link-0/
> > > 192.168.1.1 (org.apache.kafka.common.network.Selector)
> > >
> > > java.net.ConnectException: Connection refused..
> > >
> > > [2015-07-14 12:56:48,056] WARN Error in I/O with producer1-link-0/
> > > 192.168.1.2 (org.apache.kafka.common.network.Selector)
> > >
> > > java.net.ConnectException: Connection refused.
> > >
> > > What could be the cause?
> > >
> > > Thank you guys!
> > >
> > >
> > >
> > >
> > > On Tue, Jul 14, 2015 at 2:47 PM, JIEFU GONG 
> wrote:
> > >
> > >> Yuheng,
> > >>
> > >> Yes, if you read the blog post it specifies that he's using three
> > separate
> > >> machines. There's no reason the producers cannot be started at the
> same
> > >> time, I believe.
> > >>
> > >> On Tue, Jul 14, 2015 at 11:42 AM, Yuheng Du  >
> > >> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > I am running the performance test for kafka.
> > >> > https://gist.github.com/jkreps
> > >> > /c7ddb4041ef62a900e6c
> > >> >
> > >> > For the "Three Producers, 3x async replication" scenario, the
> command
> > is
> > >> > the same as one producer:
> > >> >
> > >> > bin/kafka-run-class.sh
> > org.apache.kafka.clients.tools.ProducerPerformance
> > >> > test 5000 100 -1 acks=1
> > >> > bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> > >> > buffer.memory=67108864 batch.size=8196
> > >> >
> > >> > So How to I run the test for three producers? Do I just run them on
> > three
> > >> > separate servers at the same time? Will there be some error in this
> > way
> > >> > since the three producers can't be started at the same time?
> > >> >
> > >> > Thanks.
> > >> >
> > >> > best,
> > >> > Yuheng
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >>
> > >> Jiefu Gong
> > >> University of California, Berkeley | Class of 2017
> > >> B.A Computer Science | College of Letters and Sciences
> > >>
> > >> jg...@berkeley.edu  | (925) 400-3427
> > >>
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>


Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread JIEFU GONG
Got it, looks like I didn't understand the request process and am failing
to use AB properly. Thanks for the help everyone! I suspect you might be
running into a similar error, David.

On Tue, Jul 14, 2015 at 11:56 AM, Jay Kreps  wrote:

> This is almost certainly a client bug. Kafka's request format is size
> delimited messages in the form
><4 byte size N>
> If the client sends a request with an invalid size or sends a partial
> request the server will see effectively random bytes from the next request
> as the size of the next message and generally reject the request (or fail
> to parse it).
>
> -Jay
>
> On Sat, Jul 11, 2015 at 9:08 PM, David Montgomery <
> davidmontgom...@gmail.com
> > wrote:
>
> > I cant send this s simple payload using python.
> >
> > topic: topic-test-development
> > payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
> >
> >
> > No handlers could be found for logger "kafka.conn"
> > Traceback (most recent call last):
> >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> 81,
> > in 
> > test_send_data_to_realtimenode()
> >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> 38,
> > in test_send_data_to_realtimenode
> > response = producer.send_messages(test_topic,test_payload)
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
> > line 54, in send_messages
> > topic, partition, *msg
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > line 349, in send_messages
> > return self._send_messages(topic, partition, *msg)
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > line 390, in _send_messages
> > fail_on_error=self.sync_fail_on_error
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> 480,
> > in send_produce_request
> > (not fail_on_error or not self._raise_on_response_error(resp))]
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> 247,
> > in _raise_on_response_error
> > raise resp
> > kafka.common.FailedPayloadsError
> >
> > Here is what is in my logs
> > [2015-07-12 03:29:58,103] INFO Closing socket connection to
> > /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497 is
> > not valid, it is larger than the maximum size of 104857600 bytes.
> > (kafka.network.Processor)
> >
> >
> >
> > Server is 4 gigs of ram.
> >
> > I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in kafka-server-start.sh
> >
> > So.why?
> >
>



-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu  | (925) 400-3427


Re: How to run the three producers test

2015-07-14 Thread JIEFU GONG
Yuheng, are you performing these tests locally or using a service such as
AWS? I'd try using each separate machine individually first, connecting to
the ZK/Kafka servers and ensuring that each is able to first log and
consume messages independently.

On Tue, Jul 14, 2015 at 1:17 PM, Gwen Shapira  wrote:

> Are there any errors on the broker logs?
>
> On Tue, Jul 14, 2015 at 11:57 AM, Yuheng Du 
> wrote:
> > Jiefu,
> >
> > Thank you. The three producers can run at the same time. I mean should
> they
> > be started at exactly the same time? (I have three consoles from each of
> > the three machines and I just start the console command manually one by
> > one) Or a small variation of the starting time won't matter?
> >
> > Gwen and Jiefu,
> >
> > I have started the three producers at three machines, after a while, all
> of
> > them gives a java.net.ConnectException:
> >
> > [2015-07-14 12:56:46,352] WARN Error in I/O with producer0-link-0/
> > 192.168.1.1 (org.apache.kafka.common.network.Selector)
> >
> > java.net.ConnectException: Connection refused..
> >
> > [2015-07-14 12:56:48,056] WARN Error in I/O with producer1-link-0/
> > 192.168.1.2 (org.apache.kafka.common.network.Selector)
> >
> > java.net.ConnectException: Connection refused.
> >
> > What could be the cause?
> >
> > Thank you guys!
> >
> >
> >
> >
> > On Tue, Jul 14, 2015 at 2:47 PM, JIEFU GONG  wrote:
> >
> >> Yuheng,
> >>
> >> Yes, if you read the blog post it specifies that he's using three
> separate
> >> machines. There's no reason the producers cannot be started at the same
> >> time, I believe.
> >>
> >> On Tue, Jul 14, 2015 at 11:42 AM, Yuheng Du 
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > I am running the performance test for kafka.
> >> > https://gist.github.com/jkreps
> >> > /c7ddb4041ef62a900e6c
> >> >
> >> > For the "Three Producers, 3x async replication" scenario, the command
> is
> >> > the same as one producer:
> >> >
> >> > bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance
> >> > test 5000 100 -1 acks=1
> >> > bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> >> > buffer.memory=67108864 batch.size=8196
> >> >
> >> > So How to I run the test for three producers? Do I just run them on
> three
> >> > separate servers at the same time? Will there be some error in this
> way
> >> > since the three producers can't be started at the same time?
> >> >
> >> > Thanks.
> >> >
> >> > best,
> >> > Yuheng
> >> >
> >>
> >>
> >>
> >> --
> >>
> >> Jiefu Gong
> >> University of California, Berkeley | Class of 2017
> >> B.A Computer Science | College of Letters and Sciences
> >>
> >> jg...@berkeley.edu  | (925) 400-3427
> >>
>



-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu  | (925) 400-3427


Re: How to run the three producers test

2015-07-14 Thread Gwen Shapira
Are there any errors on the broker logs?

On Tue, Jul 14, 2015 at 11:57 AM, Yuheng Du  wrote:
> Jiefu,
>
> Thank you. The three producers can run at the same time. I mean should they
> be started at exactly the same time? (I have three consoles from each of
> the three machines and I just start the console command manually one by
> one) Or a small variation of the starting time won't matter?
>
> Gwen and Jiefu,
>
> I have started the three producers at three machines, after a while, all of
> them gives a java.net.ConnectException:
>
> [2015-07-14 12:56:46,352] WARN Error in I/O with producer0-link-0/
> 192.168.1.1 (org.apache.kafka.common.network.Selector)
>
> java.net.ConnectException: Connection refused..
>
> [2015-07-14 12:56:48,056] WARN Error in I/O with producer1-link-0/
> 192.168.1.2 (org.apache.kafka.common.network.Selector)
>
> java.net.ConnectException: Connection refused.
>
> What could be the cause?
>
> Thank you guys!
>
>
>
>
> On Tue, Jul 14, 2015 at 2:47 PM, JIEFU GONG  wrote:
>
>> Yuheng,
>>
>> Yes, if you read the blog post it specifies that he's using three separate
>> machines. There's no reason the producers cannot be started at the same
>> time, I believe.
>>
>> On Tue, Jul 14, 2015 at 11:42 AM, Yuheng Du 
>> wrote:
>>
>> > Hi,
>> >
>> > I am running the performance test for kafka.
>> > https://gist.github.com/jkreps
>> > /c7ddb4041ef62a900e6c
>> >
>> > For the "Three Producers, 3x async replication" scenario, the command is
>> > the same as one producer:
>> >
>> > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
>> > test 5000 100 -1 acks=1
>> > bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
>> > buffer.memory=67108864 batch.size=8196
>> >
>> > So How to I run the test for three producers? Do I just run them on three
>> > separate servers at the same time? Will there be some error in this way
>> > since the three producers can't be started at the same time?
>> >
>> > Thanks.
>> >
>> > best,
>> > Yuheng
>> >
>>
>>
>>
>> --
>>
>> Jiefu Gong
>> University of California, Berkeley | Class of 2017
>> B.A Computer Science | College of Letters and Sciences
>>
>> jg...@berkeley.edu  | (925) 400-3427
>>


Re: How to run the three producers test

2015-07-14 Thread Yuheng Du
Jiefu,

Thank you. The three producers can run at the same time. I mean should they
be started at exactly the same time? (I have three consoles from each of
the three machines and I just start the console command manually one by
one) Or a small variation of the starting time won't matter?

Gwen and Jiefu,

I have started the three producers at three machines, after a while, all of
them gives a java.net.ConnectException:

[2015-07-14 12:56:46,352] WARN Error in I/O with producer0-link-0/
192.168.1.1 (org.apache.kafka.common.network.Selector)

java.net.ConnectException: Connection refused..

[2015-07-14 12:56:48,056] WARN Error in I/O with producer1-link-0/
192.168.1.2 (org.apache.kafka.common.network.Selector)

java.net.ConnectException: Connection refused.

What could be the cause?

Thank you guys!




On Tue, Jul 14, 2015 at 2:47 PM, JIEFU GONG  wrote:

> Yuheng,
>
> Yes, if you read the blog post it specifies that he's using three separate
> machines. There's no reason the producers cannot be started at the same
> time, I believe.
>
> On Tue, Jul 14, 2015 at 11:42 AM, Yuheng Du 
> wrote:
>
> > Hi,
> >
> > I am running the performance test for kafka.
> > https://gist.github.com/jkreps
> > /c7ddb4041ef62a900e6c
> >
> > For the "Three Producers, 3x async replication" scenario, the command is
> > the same as one producer:
> >
> > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> > test 5000 100 -1 acks=1
> > bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> > buffer.memory=67108864 batch.size=8196
> >
> > So How to I run the test for three producers? Do I just run them on three
> > separate servers at the same time? Will there be some error in this way
> > since the three producers can't be started at the same time?
> >
> > Thanks.
> >
> > best,
> > Yuheng
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>


Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread Jay Kreps
This is almost certainly a client bug. Kafka's request format is size
delimited messages in the form
   <4 byte size N>
If the client sends a request with an invalid size or sends a partial
request the server will see effectively random bytes from the next request
as the size of the next message and generally reject the request (or fail
to parse it).

-Jay

On Sat, Jul 11, 2015 at 9:08 PM, David Montgomery  wrote:

> I cant send this s simple payload using python.
>
> topic: topic-test-development
> payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
>
>
> No handlers could be found for logger "kafka.conn"
> Traceback (most recent call last):
>   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line 81,
> in 
> test_send_data_to_realtimenode()
>   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line 38,
> in test_send_data_to_realtimenode
> response = producer.send_messages(test_topic,test_payload)
>   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
> line 54, in send_messages
> topic, partition, *msg
>   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> line 349, in send_messages
> return self._send_messages(topic, partition, *msg)
>   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> line 390, in _send_messages
> fail_on_error=self.sync_fail_on_error
>   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 480,
> in send_produce_request
> (not fail_on_error or not self._raise_on_response_error(resp))]
>   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 247,
> in _raise_on_response_error
> raise resp
> kafka.common.FailedPayloadsError
>
> Here is what is in my logs
> [2015-07-12 03:29:58,103] INFO Closing socket connection to
> /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497 is
> not valid, it is larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
>
>
>
> Server is 4 gigs of ram.
>
> I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in kafka-server-start.sh
>
> So.why?
>


Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread Gwen Shapira
I am not familiar with Apache Bench. Can you share more details on
what you are doing?

On Tue, Jul 14, 2015 at 11:45 AM, JIEFU GONG  wrote:
> So I'm trying to make a request with a simple ASCII text file, but what's
> strange is even if I change files to send or the contents of the file I get
> the same error message, even specifically the number of bytes of the
> message which seems weird if I'm changing the content? Should I be using
> Avro with my file before I try to send a request?
>
> On Tue, Jul 14, 2015 at 11:29 AM, Todd Palino  wrote:
>
>> This is interesting. We have seen something similar internally at LinkedIn
>> with one particular topic (and Avro schema), and only once in a while.
>> We've seen it happen 2 or 3 times so far. We had chalked it up to bad
>> content in the message, figuring that the sender was doing something like
>> sending a long stream of a single character, in error, which was creating a
>> highly compressible message. Given these cases, I'm no longer certain
>> that's the case.
>>
>> Becket, you had been taking a look at this internally. Do you have any
>> thoughts on this?
>>
>> -Todd
>>
>>
>> On Tue, Jul 14, 2015 at 11:18 AM, JIEFU GONG  wrote:
>>
>> > @Gwen
>> > I am having a very very similar issue where I am attempting to send a
>> > rather small message and it's blowing up on me (my specific error is:
>> > Invalid receive (size = 1347375956 larger than 104857600)). I tried to
>> > change the relevant settings but it seems that this particular request is
>> > of 1340 mbs (and davids will be 1500 mb) and attempting to change the
>> > setting will give you another error saying there is not enough memory in
>> > the java heap. Any insight here?
>> >
>> > Specifically I am speculating the issue is indeed what Shayne has said
>> > about encoding: I am trying to use apachebench to send a post request to
>> a
>> > kafka server but it is returning the above error -- do I have to format
>> the
>> > data in any way as this might be the reason why I'm experience this
>> issue.
>> >
>> >
>> > On Sun, Jul 12, 2015 at 6:35 AM, Shayne S  wrote:
>> >
>> > > Your payload is so small that I suspect it's an encoding issue. Is your
>> > > producer set to expect a byte array and you're passing a string? Or
>> vice
>> > > versa?
>> > >
>> > > On Sat, Jul 11, 2015 at 11:08 PM, David Montgomery <
>> > > davidmontgom...@gmail.com> wrote:
>> > >
>> > > > I cant send this s simple payload using python.
>> > > >
>> > > > topic: topic-test-development
>> > > > payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
>> > > >
>> > > >
>> > > > No handlers could be found for logger "kafka.conn"
>> > > > Traceback (most recent call last):
>> > > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py",
>> line
>> > > 81,
>> > > > in 
>> > > > test_send_data_to_realtimenode()
>> > > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py",
>> line
>> > > 38,
>> > > > in test_send_data_to_realtimenode
>> > > > response = producer.send_messages(test_topic,test_payload)
>> > > >   File
>> > "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
>> > > > line 54, in send_messages
>> > > > topic, partition, *msg
>> > > >   File
>> "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
>> > > > line 349, in send_messages
>> > > > return self._send_messages(topic, partition, *msg)
>> > > >   File
>> "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
>> > > > line 390, in _send_messages
>> > > > fail_on_error=self.sync_fail_on_error
>> > > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
>> > > 480,
>> > > > in send_produce_request
>> > > > (not fail_on_error or not self._raise_on_response_error(resp))]
>> > > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
>> > > 247,
>> > > > in _raise_on_response_error
>> > > > raise resp
>> > > > kafka.common.FailedPayloadsError
>> > > >
>> > > > Here is what is in my logs
>> > > > [2015-07-12 03:29:58,103] INFO Closing socket connection to
>> > > > /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497
>> > is
>> > > > not valid, it is larger than the maximum size of 104857600 bytes.
>> > > > (kafka.network.Processor)
>> > > >
>> > > >
>> > > >
>> > > > Server is 4 gigs of ram.
>> > > >
>> > > > I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in
>> > kafka-server-start.sh
>> > > >
>> > > > So.why?
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> >
>> > Jiefu Gong
>> > University of California, Berkeley | Class of 2017
>> > B.A Computer Science | College of Letters and Sciences
>> >
>> > jg...@berkeley.edu  | (925) 400-3427
>> >
>>
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427


Re: How to run the three producers test

2015-07-14 Thread JIEFU GONG
Yuheng,

Yes, if you read the blog post it specifies that he's using three separate
machines. There's no reason the producers cannot be started at the same
time, I believe.

On Tue, Jul 14, 2015 at 11:42 AM, Yuheng Du 
wrote:

> Hi,
>
> I am running the performance test for kafka.
> https://gist.github.com/jkreps
> /c7ddb4041ef62a900e6c
>
> For the "Three Producers, 3x async replication" scenario, the command is
> the same as one producer:
>
> bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> test 5000 100 -1 acks=1
> bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> buffer.memory=67108864 batch.size=8196
>
> So How to I run the test for three producers? Do I just run them on three
> separate servers at the same time? Will there be some error in this way
> since the three producers can't be started at the same time?
>
> Thanks.
>
> best,
> Yuheng
>



-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu  | (925) 400-3427


Re: How to run the three producers test

2015-07-14 Thread Gwen Shapira
You need to run 3 of those at the same time. We don't expect any
errors, but if you run into anything, let us know and we'll try to
help.

Gwen

On Tue, Jul 14, 2015 at 11:42 AM, Yuheng Du  wrote:
> Hi,
>
> I am running the performance test for kafka. https://gist.github.com/jkreps
> /c7ddb4041ef62a900e6c
>
> For the "Three Producers, 3x async replication" scenario, the command is
> the same as one producer:
>
> bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> test 5000 100 -1 acks=1
> bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> buffer.memory=67108864 batch.size=8196
>
> So How to I run the test for three producers? Do I just run them on three
> separate servers at the same time? Will there be some error in this way
> since the three producers can't be started at the same time?
>
> Thanks.
>
> best,
> Yuheng


Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread JIEFU GONG
So I'm trying to make a request with a simple ASCII text file, but what's
strange is even if I change files to send or the contents of the file I get
the same error message, even specifically the number of bytes of the
message which seems weird if I'm changing the content? Should I be using
Avro with my file before I try to send a request?

On Tue, Jul 14, 2015 at 11:29 AM, Todd Palino  wrote:

> This is interesting. We have seen something similar internally at LinkedIn
> with one particular topic (and Avro schema), and only once in a while.
> We've seen it happen 2 or 3 times so far. We had chalked it up to bad
> content in the message, figuring that the sender was doing something like
> sending a long stream of a single character, in error, which was creating a
> highly compressible message. Given these cases, I'm no longer certain
> that's the case.
>
> Becket, you had been taking a look at this internally. Do you have any
> thoughts on this?
>
> -Todd
>
>
> On Tue, Jul 14, 2015 at 11:18 AM, JIEFU GONG  wrote:
>
> > @Gwen
> > I am having a very very similar issue where I am attempting to send a
> > rather small message and it's blowing up on me (my specific error is:
> > Invalid receive (size = 1347375956 larger than 104857600)). I tried to
> > change the relevant settings but it seems that this particular request is
> > of 1340 mbs (and davids will be 1500 mb) and attempting to change the
> > setting will give you another error saying there is not enough memory in
> > the java heap. Any insight here?
> >
> > Specifically I am speculating the issue is indeed what Shayne has said
> > about encoding: I am trying to use apachebench to send a post request to
> a
> > kafka server but it is returning the above error -- do I have to format
> the
> > data in any way as this might be the reason why I'm experience this
> issue.
> >
> >
> > On Sun, Jul 12, 2015 at 6:35 AM, Shayne S  wrote:
> >
> > > Your payload is so small that I suspect it's an encoding issue. Is your
> > > producer set to expect a byte array and you're passing a string? Or
> vice
> > > versa?
> > >
> > > On Sat, Jul 11, 2015 at 11:08 PM, David Montgomery <
> > > davidmontgom...@gmail.com> wrote:
> > >
> > > > I cant send this s simple payload using python.
> > > >
> > > > topic: topic-test-development
> > > > payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
> > > >
> > > >
> > > > No handlers could be found for logger "kafka.conn"
> > > > Traceback (most recent call last):
> > > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py",
> line
> > > 81,
> > > > in 
> > > > test_send_data_to_realtimenode()
> > > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py",
> line
> > > 38,
> > > > in test_send_data_to_realtimenode
> > > > response = producer.send_messages(test_topic,test_payload)
> > > >   File
> > "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
> > > > line 54, in send_messages
> > > > topic, partition, *msg
> > > >   File
> "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > > line 349, in send_messages
> > > > return self._send_messages(topic, partition, *msg)
> > > >   File
> "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > > line 390, in _send_messages
> > > > fail_on_error=self.sync_fail_on_error
> > > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > > 480,
> > > > in send_produce_request
> > > > (not fail_on_error or not self._raise_on_response_error(resp))]
> > > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > > 247,
> > > > in _raise_on_response_error
> > > > raise resp
> > > > kafka.common.FailedPayloadsError
> > > >
> > > > Here is what is in my logs
> > > > [2015-07-12 03:29:58,103] INFO Closing socket connection to
> > > > /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497
> > is
> > > > not valid, it is larger than the maximum size of 104857600 bytes.
> > > > (kafka.network.Processor)
> > > >
> > > >
> > > >
> > > > Server is 4 gigs of ram.
> > > >
> > > > I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in
> > kafka-server-start.sh
> > > >
> > > > So.why?
> > > >
> > >
> >
> >
> >
> > --
> >
> > Jiefu Gong
> > University of California, Berkeley | Class of 2017
> > B.A Computer Science | College of Letters and Sciences
> >
> > jg...@berkeley.edu  | (925) 400-3427
> >
>



-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu  | (925) 400-3427


How to run the three producers test

2015-07-14 Thread Yuheng Du
Hi,

I am running the performance test for kafka. https://gist.github.com/jkreps
/c7ddb4041ef62a900e6c

For the "Three Producers, 3x async replication" scenario, the command is
the same as one producer:

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test 5000 100 -1 acks=1
bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
buffer.memory=67108864 batch.size=8196

So How to I run the test for three producers? Do I just run them on three
separate servers at the same time? Will there be some error in this way
since the three producers can't be started at the same time?

Thanks.

best,
Yuheng


Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread Todd Palino
This is interesting. We have seen something similar internally at LinkedIn
with one particular topic (and Avro schema), and only once in a while.
We've seen it happen 2 or 3 times so far. We had chalked it up to bad
content in the message, figuring that the sender was doing something like
sending a long stream of a single character, in error, which was creating a
highly compressible message. Given these cases, I'm no longer certain
that's the case.

Becket, you had been taking a look at this internally. Do you have any
thoughts on this?

-Todd


On Tue, Jul 14, 2015 at 11:18 AM, JIEFU GONG  wrote:

> @Gwen
> I am having a very very similar issue where I am attempting to send a
> rather small message and it's blowing up on me (my specific error is:
> Invalid receive (size = 1347375956 larger than 104857600)). I tried to
> change the relevant settings but it seems that this particular request is
> of 1340 mbs (and davids will be 1500 mb) and attempting to change the
> setting will give you another error saying there is not enough memory in
> the java heap. Any insight here?
>
> Specifically I am speculating the issue is indeed what Shayne has said
> about encoding: I am trying to use apachebench to send a post request to a
> kafka server but it is returning the above error -- do I have to format the
> data in any way as this might be the reason why I'm experience this issue.
>
>
> On Sun, Jul 12, 2015 at 6:35 AM, Shayne S  wrote:
>
> > Your payload is so small that I suspect it's an encoding issue. Is your
> > producer set to expect a byte array and you're passing a string? Or vice
> > versa?
> >
> > On Sat, Jul 11, 2015 at 11:08 PM, David Montgomery <
> > davidmontgom...@gmail.com> wrote:
> >
> > > I cant send this s simple payload using python.
> > >
> > > topic: topic-test-development
> > > payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
> > >
> > >
> > > No handlers could be found for logger "kafka.conn"
> > > Traceback (most recent call last):
> > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> > 81,
> > > in 
> > > test_send_data_to_realtimenode()
> > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> > 38,
> > > in test_send_data_to_realtimenode
> > > response = producer.send_messages(test_topic,test_payload)
> > >   File
> "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
> > > line 54, in send_messages
> > > topic, partition, *msg
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > line 349, in send_messages
> > > return self._send_messages(topic, partition, *msg)
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > line 390, in _send_messages
> > > fail_on_error=self.sync_fail_on_error
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > 480,
> > > in send_produce_request
> > > (not fail_on_error or not self._raise_on_response_error(resp))]
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > 247,
> > > in _raise_on_response_error
> > > raise resp
> > > kafka.common.FailedPayloadsError
> > >
> > > Here is what is in my logs
> > > [2015-07-12 03:29:58,103] INFO Closing socket connection to
> > > /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497
> is
> > > not valid, it is larger than the maximum size of 104857600 bytes.
> > > (kafka.network.Processor)
> > >
> > >
> > >
> > > Server is 4 gigs of ram.
> > >
> > > I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in
> kafka-server-start.sh
> > >
> > > So.why?
> > >
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>


Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread JIEFU GONG
@Gwen
I am having a very very similar issue where I am attempting to send a
rather small message and it's blowing up on me (my specific error is:
Invalid receive (size = 1347375956 larger than 104857600)). I tried to
change the relevant settings but it seems that this particular request is
of 1340 mbs (and davids will be 1500 mb) and attempting to change the
setting will give you another error saying there is not enough memory in
the java heap. Any insight here?

Specifically I am speculating the issue is indeed what Shayne has said
about encoding: I am trying to use apachebench to send a post request to a
kafka server but it is returning the above error -- do I have to format the
data in any way as this might be the reason why I'm experience this issue.


On Sun, Jul 12, 2015 at 6:35 AM, Shayne S  wrote:

> Your payload is so small that I suspect it's an encoding issue. Is your
> producer set to expect a byte array and you're passing a string? Or vice
> versa?
>
> On Sat, Jul 11, 2015 at 11:08 PM, David Montgomery <
> davidmontgom...@gmail.com> wrote:
>
> > I cant send this s simple payload using python.
> >
> > topic: topic-test-development
> > payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
> >
> >
> > No handlers could be found for logger "kafka.conn"
> > Traceback (most recent call last):
> >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> 81,
> > in 
> > test_send_data_to_realtimenode()
> >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> 38,
> > in test_send_data_to_realtimenode
> > response = producer.send_messages(test_topic,test_payload)
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
> > line 54, in send_messages
> > topic, partition, *msg
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > line 349, in send_messages
> > return self._send_messages(topic, partition, *msg)
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > line 390, in _send_messages
> > fail_on_error=self.sync_fail_on_error
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> 480,
> > in send_produce_request
> > (not fail_on_error or not self._raise_on_response_error(resp))]
> >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> 247,
> > in _raise_on_response_error
> > raise resp
> > kafka.common.FailedPayloadsError
> >
> > Here is what is in my logs
> > [2015-07-12 03:29:58,103] INFO Closing socket connection to
> > /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497 is
> > not valid, it is larger than the maximum size of 104857600 bytes.
> > (kafka.network.Processor)
> >
> >
> >
> > Server is 4 gigs of ram.
> >
> > I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in kafka-server-start.sh
> >
> > So.why?
> >
>



-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu  | (925) 400-3427


Re: Fwd: Offset not committed

2015-07-14 Thread Vadim Bobrov
just caught this error again. I issue commitOffsets - no error but no
committng offsets either. __consumer_offsets watching shows no new messages
either. Then in a few minutes I issue commitOffsets again - all committed.
Unless I am doing something terribly wrong this is very unreliable

On Tue, Jul 14, 2015 at 1:49 PM, Joel Koshy  wrote:

> Actually, how are you committing offsets? Are you using the old
> (zookeeperconsumerconnector) or new KafkaConsumer?
>
> It is true that the current APIs don't return any result, but it would
> help to check if anything is getting into the offsets topic - unless
> you are seeing errors in the logs, the offset commit should succeed
> (if you are indeed explicitly committing offsets).
>
> Thanks,
>
> Joel
>
> On Tue, Jul 14, 2015 at 12:19:01PM -0400, Vadim Bobrov wrote:
> > Thanks, Joel, I will but regardless of my findings the basic problem will
> > still be there: there is no guarantee that the offsets will be committed
> > after commitOffsets. Because commitOffsets does not return its exit
> status,
> > nor does it block as I understand until offsets are committed. In other
> > words, there is no way to know that it has, in fact, commited the offsets
> >
> > or am I missing something? And then another question - why does it seem
> to
> > depend on the number of consumed messages?
> >
> > On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy 
> wrote:
> >
> > > Can you take a look at the kafka commit rate mbean on your consumer?
> > > Also, can you consume the offsets topic while you are committing
> > > offsets and see if/what offsets are getting committed?
> > > (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
> > > > I am trying to replace ActiveMQ with Kafka in our environment
> however I
> > > > have encountered a strange problem that basically prevents from using
> > > Kafka
> > > > in production. The problem is that sometimes the offsets are not
> > > committed.
> > > >
> > > > I am using Kafka 0.8.2.1, offset storage = kafka, high level
> consumer,
> > > > auto-commit = off. Every N messages I issue commitOffsets(). Now
> here is
> > > > the problem - if N is below a certain number (180 000 for me) it
> works
> > > and
> > > > the offset is moving. If N is 180 000 or more the offset is not
> updated
> > > > after commitOffsets
> > > >
> > > > I am looking at offsets using kafka-run-class.sh
> > > > kafka.tools.ConsumerOffsetChecker
> > > > Any help?
> > >
> > >
>
>


Latency test

2015-07-14 Thread Yuheng Du
Currently, the latency test from kafka test the end to end latency between
producers and consumers.

Is there  a way to test the producer to broker  and broker to consumer
delay seperately?

Thanks.


Re: Fwd: Offset not committed

2015-07-14 Thread Vadim Bobrov
I am using ZookeeperConsumerConnector

actually I set up a consumer for __consumer_offsets the way you suggested
and now I cannot reproduce the situation any longer. Offsets are committed
every time.

On Tue, Jul 14, 2015 at 1:49 PM, Joel Koshy  wrote:

> Actually, how are you committing offsets? Are you using the old
> (zookeeperconsumerconnector) or new KafkaConsumer?
>
> It is true that the current APIs don't return any result, but it would
> help to check if anything is getting into the offsets topic - unless
> you are seeing errors in the logs, the offset commit should succeed
> (if you are indeed explicitly committing offsets).
>
> Thanks,
>
> Joel
>
> On Tue, Jul 14, 2015 at 12:19:01PM -0400, Vadim Bobrov wrote:
> > Thanks, Joel, I will but regardless of my findings the basic problem will
> > still be there: there is no guarantee that the offsets will be committed
> > after commitOffsets. Because commitOffsets does not return its exit
> status,
> > nor does it block as I understand until offsets are committed. In other
> > words, there is no way to know that it has, in fact, commited the offsets
> >
> > or am I missing something? And then another question - why does it seem
> to
> > depend on the number of consumed messages?
> >
> > On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy 
> wrote:
> >
> > > Can you take a look at the kafka commit rate mbean on your consumer?
> > > Also, can you consume the offsets topic while you are committing
> > > offsets and see if/what offsets are getting committed?
> > > (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
> > > > I am trying to replace ActiveMQ with Kafka in our environment
> however I
> > > > have encountered a strange problem that basically prevents from using
> > > Kafka
> > > > in production. The problem is that sometimes the offsets are not
> > > committed.
> > > >
> > > > I am using Kafka 0.8.2.1, offset storage = kafka, high level
> consumer,
> > > > auto-commit = off. Every N messages I issue commitOffsets(). Now
> here is
> > > > the problem - if N is below a certain number (180 000 for me) it
> works
> > > and
> > > > the offset is moving. If N is 180 000 or more the offset is not
> updated
> > > > after commitOffsets
> > > >
> > > > I am looking at offsets using kafka-run-class.sh
> > > > kafka.tools.ConsumerOffsetChecker
> > > > Any help?
> > >
> > >
>
>


Re: kafka benchmark tests

2015-07-14 Thread JIEFU GONG
Yuheng,
I would recommend looking here:
http://kafka.apache.org/documentation.html#brokerconfigs and scrolling down
to get a better understanding of the default settings and what they mean --
it'll tell you what different options for acks does.

Ewen,
Thank you immensely for your thoughts, they shed a lot of insight into the
issue. Though it is understandable that your specific results need to be
verified, it seems that the KIP-25 patch is functional and I can use it for
my own benchmarking purposes? Is that correct? Thanks again!

On Tue, Jul 14, 2015 at 8:22 AM, Yuheng Du  wrote:

> Also, I guess setting the target throughput to -1 means let it be as high
> as possible?
>
> On Tue, Jul 14, 2015 at 10:36 AM, Yuheng Du 
> wrote:
>
> > Thanks. If I set the acks=1 in the producer config options in
> > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> > test7 5000 100 -1 acks=1 bootstrap.servers=
> > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> batch.size=8196?
> >
> > Does that mean for each message generated at the producer, the producer
> > will wait until the broker sends the ack back, then send another message?
> >
> > Thanks.
> >
> > Yuheng
> >
> > On Tue, Jul 14, 2015 at 10:06 AM, Manikumar Reddy 
> > wrote:
> >
> >> Yes, A list of  Kafka Server host/port pairs to use for establishing the
> >> initial connection to the Kafka cluster
> >>
> >> https://kafka.apache.org/documentation.html#newproducerconfigs
> >>
> >> On Tue, Jul 14, 2015 at 7:29 PM, Yuheng Du 
> >> wrote:
> >>
> >> > Does anyone know what is bootstrap.servers=
> >> > esv4-hcl198.grid.linkedin.com:9092 means in the following test
> command:
> >> >
> >> > bin/kafka-run-class.sh
> >> org.apache.kafka.clients.tools.ProducerPerformance
> >> > test7 5000 100 -1 acks=1 bootstrap.servers=
> >> > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> >> batch.size=8196?
> >> >
> >> > what is bootstrap.servers? Is it the kafka server that I am running a
> >> test
> >> > at?
> >> >
> >> > Thanks.
> >> >
> >> > Yuheng
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Jul 14, 2015 at 12:18 AM, Ewen Cheslack-Postava <
> >> e...@confluent.io
> >> > >
> >> > wrote:
> >> >
> >> > > I implemented (nearly) the same basic set of tests in the system
> test
> >> > > framework we started at Confluent and that is going to move into
> >> Kafka --
> >> > > see the wip patch for KIP-25 here:
> >> > https://github.com/apache/kafka/pull/70
> >> > > In particular, that test is implemented in benchmark_test.py:
> >> > >
> >> > >
> >> >
> >>
> https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8
> >> > >
> >> > > Hopefully once that's merged people can reuse that benchmark (and
> add
> >> to
> >> > > it!) so they can easily run the same benchmarks across different
> >> > hardware.
> >> > > Here are some results from an older version of that test on
> m3.2xlarge
> >> > > instances on EC2 using local ephemeral storage (I think... it's been
> >> > awhile
> >> > > since I ran these numbers and I didn't document methodology that
> >> > > carefully):
> >> > >
> >> > > INFO:_.KafkaBenchmark:=
> >> > > INFO:_.KafkaBenchmark:BENCHMARK RESULTS
> >> > > INFO:_.KafkaBenchmark:=
> >> > > INFO:_.KafkaBenchmark:Single producer, no replication: 684097.470208
> >> > > rec/sec (65.24 MB/s)
> >> > > INFO:_.KafkaBenchmark:Single producer, async 3x replication:
> >> > > 667494.359673 rec/sec (63.66 MB/s)
> >> > > INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
> >> > > 116485.764275 rec/sec (11.11 MB/s)
> >> > > INFO:_.KafkaBenchmark:Three producers, async 3x replication:
> >> > > 1696519.022182 rec/sec (161.79 MB/s)
> >> > > INFO:_.KafkaBenchmark:Message size:
> >> > > INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
> >> > > INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
> >> > > INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
> >> > > INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
> >> > > INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
> >> > > INFO:_.KafkaBenchmark:Throughput over long run, data > memory:
> >> > > INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec
> (65.30
> >> > MB/s)
> >> > > INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec
> >> (56.830500
> >> > > MB/s)
> >> > > INFO:_.KafkaBenchmark:Three consumers: 3304011.014900 rec/sec
> >> (267.830800
> >> > > MB/s)
> >> > > INFO:_.KafkaBenchmark:Producer + consumer:
> >> > > INFO:_.KafkaBenchmark: Producer: 624984.375391 rec/sec (59.60
> >> MB/s)
> >> > > INFO:_.KafkaBenchmark: Consumer: 624984.375391 rec/sec (59.60
> >> MB/s)
> >> > > INFO:_.KafkaBenchmark:End-to-end latency: median 2.00 ms, 99%
> >> > > 4.00 ms, 99.9% 19.00 ms
> >> > >
> >> > > Don't trust these numbers for anything, the were a quick one-off
> test.
> >> > I'm
> >> > > just pasting the

Re: Fwd: Offset not committed

2015-07-14 Thread Joel Koshy
Actually, how are you committing offsets? Are you using the old
(zookeeperconsumerconnector) or new KafkaConsumer?

It is true that the current APIs don't return any result, but it would
help to check if anything is getting into the offsets topic - unless
you are seeing errors in the logs, the offset commit should succeed
(if you are indeed explicitly committing offsets).

Thanks,

Joel

On Tue, Jul 14, 2015 at 12:19:01PM -0400, Vadim Bobrov wrote:
> Thanks, Joel, I will but regardless of my findings the basic problem will
> still be there: there is no guarantee that the offsets will be committed
> after commitOffsets. Because commitOffsets does not return its exit status,
> nor does it block as I understand until offsets are committed. In other
> words, there is no way to know that it has, in fact, commited the offsets
> 
> or am I missing something? And then another question - why does it seem to
> depend on the number of consumed messages?
> 
> On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy  wrote:
> 
> > Can you take a look at the kafka commit rate mbean on your consumer?
> > Also, can you consume the offsets topic while you are committing
> > offsets and see if/what offsets are getting committed?
> > (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
> >
> > Thanks,
> >
> > Joel
> >
> > On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
> > > I am trying to replace ActiveMQ with Kafka in our environment however I
> > > have encountered a strange problem that basically prevents from using
> > Kafka
> > > in production. The problem is that sometimes the offsets are not
> > committed.
> > >
> > > I am using Kafka 0.8.2.1, offset storage = kafka, high level consumer,
> > > auto-commit = off. Every N messages I issue commitOffsets(). Now here is
> > > the problem - if N is below a certain number (180 000 for me) it works
> > and
> > > the offset is moving. If N is 180 000 or more the offset is not updated
> > > after commitOffsets
> > >
> > > I am looking at offsets using kafka-run-class.sh
> > > kafka.tools.ConsumerOffsetChecker
> > > Any help?
> >
> >



Re: Fwd: Offset not committed

2015-07-14 Thread Vadim Bobrov
Thanks, Joel, I will but regardless of my findings the basic problem will
still be there: there is no guarantee that the offsets will be committed
after commitOffsets. Because commitOffsets does not return its exit status,
nor does it block as I understand until offsets are committed. In other
words, there is no way to know that it has, in fact, commited the offsets

or am I missing something? And then another question - why does it seem to
depend on the number of consumed messages?

On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy  wrote:

> Can you take a look at the kafka commit rate mbean on your consumer?
> Also, can you consume the offsets topic while you are committing
> offsets and see if/what offsets are getting committed?
> (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
>
> Thanks,
>
> Joel
>
> On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
> > I am trying to replace ActiveMQ with Kafka in our environment however I
> > have encountered a strange problem that basically prevents from using
> Kafka
> > in production. The problem is that sometimes the offsets are not
> committed.
> >
> > I am using Kafka 0.8.2.1, offset storage = kafka, high level consumer,
> > auto-commit = off. Every N messages I issue commitOffsets(). Now here is
> > the problem - if N is below a certain number (180 000 for me) it works
> and
> > the offset is moving. If N is 180 000 or more the offset is not updated
> > after commitOffsets
> >
> > I am looking at offsets using kafka-run-class.sh
> > kafka.tools.ConsumerOffsetChecker
> > Any help?
>
>


Re: Fwd: Offset not committed

2015-07-14 Thread Joel Koshy
Can you take a look at the kafka commit rate mbean on your consumer?
Also, can you consume the offsets topic while you are committing
offsets and see if/what offsets are getting committed?
(http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)

Thanks,

Joel

On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
> I am trying to replace ActiveMQ with Kafka in our environment however I
> have encountered a strange problem that basically prevents from using Kafka
> in production. The problem is that sometimes the offsets are not committed.
> 
> I am using Kafka 0.8.2.1, offset storage = kafka, high level consumer,
> auto-commit = off. Every N messages I issue commitOffsets(). Now here is
> the problem - if N is below a certain number (180 000 for me) it works and
> the offset is moving. If N is 180 000 or more the offset is not updated
> after commitOffsets
> 
> I am looking at offsets using kafka-run-class.sh
> kafka.tools.ConsumerOffsetChecker
> Any help?



Re: kafka benchmark tests

2015-07-14 Thread Yuheng Du
Also, I guess setting the target throughput to -1 means let it be as high
as possible?

On Tue, Jul 14, 2015 at 10:36 AM, Yuheng Du 
wrote:

> Thanks. If I set the acks=1 in the producer config options in
> bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> test7 5000 100 -1 acks=1 bootstrap.servers=
> esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196?
>
> Does that mean for each message generated at the producer, the producer
> will wait until the broker sends the ack back, then send another message?
>
> Thanks.
>
> Yuheng
>
> On Tue, Jul 14, 2015 at 10:06 AM, Manikumar Reddy 
> wrote:
>
>> Yes, A list of  Kafka Server host/port pairs to use for establishing the
>> initial connection to the Kafka cluster
>>
>> https://kafka.apache.org/documentation.html#newproducerconfigs
>>
>> On Tue, Jul 14, 2015 at 7:29 PM, Yuheng Du 
>> wrote:
>>
>> > Does anyone know what is bootstrap.servers=
>> > esv4-hcl198.grid.linkedin.com:9092 means in the following test command:
>> >
>> > bin/kafka-run-class.sh
>> org.apache.kafka.clients.tools.ProducerPerformance
>> > test7 5000 100 -1 acks=1 bootstrap.servers=
>> > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
>> batch.size=8196?
>> >
>> > what is bootstrap.servers? Is it the kafka server that I am running a
>> test
>> > at?
>> >
>> > Thanks.
>> >
>> > Yuheng
>> >
>> >
>> >
>> >
>> > On Tue, Jul 14, 2015 at 12:18 AM, Ewen Cheslack-Postava <
>> e...@confluent.io
>> > >
>> > wrote:
>> >
>> > > I implemented (nearly) the same basic set of tests in the system test
>> > > framework we started at Confluent and that is going to move into
>> Kafka --
>> > > see the wip patch for KIP-25 here:
>> > https://github.com/apache/kafka/pull/70
>> > > In particular, that test is implemented in benchmark_test.py:
>> > >
>> > >
>> >
>> https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8
>> > >
>> > > Hopefully once that's merged people can reuse that benchmark (and add
>> to
>> > > it!) so they can easily run the same benchmarks across different
>> > hardware.
>> > > Here are some results from an older version of that test on m3.2xlarge
>> > > instances on EC2 using local ephemeral storage (I think... it's been
>> > awhile
>> > > since I ran these numbers and I didn't document methodology that
>> > > carefully):
>> > >
>> > > INFO:_.KafkaBenchmark:=
>> > > INFO:_.KafkaBenchmark:BENCHMARK RESULTS
>> > > INFO:_.KafkaBenchmark:=
>> > > INFO:_.KafkaBenchmark:Single producer, no replication: 684097.470208
>> > > rec/sec (65.24 MB/s)
>> > > INFO:_.KafkaBenchmark:Single producer, async 3x replication:
>> > > 667494.359673 rec/sec (63.66 MB/s)
>> > > INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
>> > > 116485.764275 rec/sec (11.11 MB/s)
>> > > INFO:_.KafkaBenchmark:Three producers, async 3x replication:
>> > > 1696519.022182 rec/sec (161.79 MB/s)
>> > > INFO:_.KafkaBenchmark:Message size:
>> > > INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
>> > > INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
>> > > INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
>> > > INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
>> > > INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
>> > > INFO:_.KafkaBenchmark:Throughput over long run, data > memory:
>> > > INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec (65.30
>> > MB/s)
>> > > INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec
>> (56.830500
>> > > MB/s)
>> > > INFO:_.KafkaBenchmark:Three consumers: 3304011.014900 rec/sec
>> (267.830800
>> > > MB/s)
>> > > INFO:_.KafkaBenchmark:Producer + consumer:
>> > > INFO:_.KafkaBenchmark: Producer: 624984.375391 rec/sec (59.60
>> MB/s)
>> > > INFO:_.KafkaBenchmark: Consumer: 624984.375391 rec/sec (59.60
>> MB/s)
>> > > INFO:_.KafkaBenchmark:End-to-end latency: median 2.00 ms, 99%
>> > > 4.00 ms, 99.9% 19.00 ms
>> > >
>> > > Don't trust these numbers for anything, the were a quick one-off test.
>> > I'm
>> > > just pasting the output so you get some idea of what the results might
>> > look
>> > > like. Once we merge the KIP-25 patch, Confluent will be running the
>> tests
>> > > regularly and results will be available publicly so we'll be able to
>> keep
>> > > better tabs on performance, albeit for only a specific class of
>> hardware.
>> > >
>> > > For the batch.size question -- I'm not sure the results in the blog
>> post
>> > > actually have different settings, it could be accidental divergence
>> > between
>> > > the script and the blog post. The post specifically notes that tuning
>> the
>> > > batch size in the synchronous case might help, but that he didn't do
>> > that.
>> > > If you're trying to benchmark the *optimal* throughput, tuning the
>> batch
>> > > size would make sense. Since synchronous replication will have higher
>>

Fwd: Offset not committed

2015-07-14 Thread Vadim Bobrov
I am trying to replace ActiveMQ with Kafka in our environment however I
have encountered a strange problem that basically prevents from using Kafka
in production. The problem is that sometimes the offsets are not committed.

I am using Kafka 0.8.2.1, offset storage = kafka, high level consumer,
auto-commit = off. Every N messages I issue commitOffsets(). Now here is
the problem - if N is below a certain number (180 000 for me) it works and
the offset is moving. If N is 180 000 or more the offset is not updated
after commitOffsets

I am looking at offsets using kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker
Any help?


RE: Kafka producer input file

2015-07-14 Thread Todd Snyder
You can also checkout Klogger (https://github.com/blackberry/Klogger), which 
will take input from a TCP port or a file.

Todd.

-Original Message-
From: Jason Gustafson [mailto:ja...@confluent.io] 
Sent: Monday, July 13, 2015 20:09
To: users@kafka.apache.org
Subject: Re: Kafka producer input file

There is also kafkacat (https://github.com/edenhill/kafkacat), which
exposes a few more knobs than the console producer.

-Jason

On Sat, Jul 11, 2015 at 6:40 PM, tsoli...@gmail.com 
wrote:

> Thank you, Shayne.
>
> On Sat, Jul 11, 2015 at 6:35 PM, Shayne S  wrote:
>
> > The console producer will read from STDIN. Assuming you are using 0.8.2,
> > you can pipe the file right in like this:
> >
> > kafka-console-produce.sh --broker-list localhost:9092 --topic my_topic
> > --new-producer < my_file.txt
> >
> > On Sat, Jul 11, 2015 at 6:32 PM, tsoli...@gmail.com 
> > wrote:
> >
> > > Hello, I am trying to setup a Kafka producer to take input from a file
> > > instead of standard input. According to Kafka documentation
> > > :
> > >
> > > Kafka comes with a command line client that will take input from a file
> > or
> > > > from standard input and send it out as messages to the Kafka cluster.
> > >
> > >
> > > I could not seem to find any documentation on how the the command line
> > > client can read from a file. Could you please help?
> > >
> > > Thanks,
> > > Tim
> > >
> >
>


Re: kafka benchmark tests

2015-07-14 Thread Yuheng Du
Thanks. If I set the acks=1 in the producer config options in
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test7 5000 100 -1 acks=1 bootstrap.servers=
esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196?

Does that mean for each message generated at the producer, the producer
will wait until the broker sends the ack back, then send another message?

Thanks.

Yuheng

On Tue, Jul 14, 2015 at 10:06 AM, Manikumar Reddy 
wrote:

> Yes, A list of  Kafka Server host/port pairs to use for establishing the
> initial connection to the Kafka cluster
>
> https://kafka.apache.org/documentation.html#newproducerconfigs
>
> On Tue, Jul 14, 2015 at 7:29 PM, Yuheng Du 
> wrote:
>
> > Does anyone know what is bootstrap.servers=
> > esv4-hcl198.grid.linkedin.com:9092 means in the following test command:
> >
> > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> > test7 5000 100 -1 acks=1 bootstrap.servers=
> > esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> batch.size=8196?
> >
> > what is bootstrap.servers? Is it the kafka server that I am running a
> test
> > at?
> >
> > Thanks.
> >
> > Yuheng
> >
> >
> >
> >
> > On Tue, Jul 14, 2015 at 12:18 AM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > wrote:
> >
> > > I implemented (nearly) the same basic set of tests in the system test
> > > framework we started at Confluent and that is going to move into Kafka
> --
> > > see the wip patch for KIP-25 here:
> > https://github.com/apache/kafka/pull/70
> > > In particular, that test is implemented in benchmark_test.py:
> > >
> > >
> >
> https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8
> > >
> > > Hopefully once that's merged people can reuse that benchmark (and add
> to
> > > it!) so they can easily run the same benchmarks across different
> > hardware.
> > > Here are some results from an older version of that test on m3.2xlarge
> > > instances on EC2 using local ephemeral storage (I think... it's been
> > awhile
> > > since I ran these numbers and I didn't document methodology that
> > > carefully):
> > >
> > > INFO:_.KafkaBenchmark:=
> > > INFO:_.KafkaBenchmark:BENCHMARK RESULTS
> > > INFO:_.KafkaBenchmark:=
> > > INFO:_.KafkaBenchmark:Single producer, no replication: 684097.470208
> > > rec/sec (65.24 MB/s)
> > > INFO:_.KafkaBenchmark:Single producer, async 3x replication:
> > > 667494.359673 rec/sec (63.66 MB/s)
> > > INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
> > > 116485.764275 rec/sec (11.11 MB/s)
> > > INFO:_.KafkaBenchmark:Three producers, async 3x replication:
> > > 1696519.022182 rec/sec (161.79 MB/s)
> > > INFO:_.KafkaBenchmark:Message size:
> > > INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
> > > INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
> > > INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
> > > INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
> > > INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
> > > INFO:_.KafkaBenchmark:Throughput over long run, data > memory:
> > > INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec (65.30
> > MB/s)
> > > INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec (56.830500
> > > MB/s)
> > > INFO:_.KafkaBenchmark:Three consumers: 3304011.014900 rec/sec
> (267.830800
> > > MB/s)
> > > INFO:_.KafkaBenchmark:Producer + consumer:
> > > INFO:_.KafkaBenchmark: Producer: 624984.375391 rec/sec (59.60 MB/s)
> > > INFO:_.KafkaBenchmark: Consumer: 624984.375391 rec/sec (59.60 MB/s)
> > > INFO:_.KafkaBenchmark:End-to-end latency: median 2.00 ms, 99%
> > > 4.00 ms, 99.9% 19.00 ms
> > >
> > > Don't trust these numbers for anything, the were a quick one-off test.
> > I'm
> > > just pasting the output so you get some idea of what the results might
> > look
> > > like. Once we merge the KIP-25 patch, Confluent will be running the
> tests
> > > regularly and results will be available publicly so we'll be able to
> keep
> > > better tabs on performance, albeit for only a specific class of
> hardware.
> > >
> > > For the batch.size question -- I'm not sure the results in the blog
> post
> > > actually have different settings, it could be accidental divergence
> > between
> > > the script and the blog post. The post specifically notes that tuning
> the
> > > batch size in the synchronous case might help, but that he didn't do
> > that.
> > > If you're trying to benchmark the *optimal* throughput, tuning the
> batch
> > > size would make sense. Since synchronous replication will have higher
> > > latency and there's a limit to how many requests can be in flight at
> > once,
> > > you'll want a larger batch size to compensate for the additional
> latency.
> > > However, in practice the increase you see may be negligible. Somebody
> who
> > > has spent more time fiddling wi

Re: kafka benchmark tests

2015-07-14 Thread Manikumar Reddy
Yes, A list of  Kafka Server host/port pairs to use for establishing the
initial connection to the Kafka cluster

https://kafka.apache.org/documentation.html#newproducerconfigs

On Tue, Jul 14, 2015 at 7:29 PM, Yuheng Du  wrote:

> Does anyone know what is bootstrap.servers=
> esv4-hcl198.grid.linkedin.com:9092 means in the following test command:
>
> bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> test7 5000 100 -1 acks=1 bootstrap.servers=
> esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196?
>
> what is bootstrap.servers? Is it the kafka server that I am running a test
> at?
>
> Thanks.
>
> Yuheng
>
>
>
>
> On Tue, Jul 14, 2015 at 12:18 AM, Ewen Cheslack-Postava  >
> wrote:
>
> > I implemented (nearly) the same basic set of tests in the system test
> > framework we started at Confluent and that is going to move into Kafka --
> > see the wip patch for KIP-25 here:
> https://github.com/apache/kafka/pull/70
> > In particular, that test is implemented in benchmark_test.py:
> >
> >
> https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8
> >
> > Hopefully once that's merged people can reuse that benchmark (and add to
> > it!) so they can easily run the same benchmarks across different
> hardware.
> > Here are some results from an older version of that test on m3.2xlarge
> > instances on EC2 using local ephemeral storage (I think... it's been
> awhile
> > since I ran these numbers and I didn't document methodology that
> > carefully):
> >
> > INFO:_.KafkaBenchmark:=
> > INFO:_.KafkaBenchmark:BENCHMARK RESULTS
> > INFO:_.KafkaBenchmark:=
> > INFO:_.KafkaBenchmark:Single producer, no replication: 684097.470208
> > rec/sec (65.24 MB/s)
> > INFO:_.KafkaBenchmark:Single producer, async 3x replication:
> > 667494.359673 rec/sec (63.66 MB/s)
> > INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
> > 116485.764275 rec/sec (11.11 MB/s)
> > INFO:_.KafkaBenchmark:Three producers, async 3x replication:
> > 1696519.022182 rec/sec (161.79 MB/s)
> > INFO:_.KafkaBenchmark:Message size:
> > INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
> > INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
> > INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
> > INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
> > INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
> > INFO:_.KafkaBenchmark:Throughput over long run, data > memory:
> > INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec (65.30
> MB/s)
> > INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec (56.830500
> > MB/s)
> > INFO:_.KafkaBenchmark:Three consumers: 3304011.014900 rec/sec (267.830800
> > MB/s)
> > INFO:_.KafkaBenchmark:Producer + consumer:
> > INFO:_.KafkaBenchmark: Producer: 624984.375391 rec/sec (59.60 MB/s)
> > INFO:_.KafkaBenchmark: Consumer: 624984.375391 rec/sec (59.60 MB/s)
> > INFO:_.KafkaBenchmark:End-to-end latency: median 2.00 ms, 99%
> > 4.00 ms, 99.9% 19.00 ms
> >
> > Don't trust these numbers for anything, the were a quick one-off test.
> I'm
> > just pasting the output so you get some idea of what the results might
> look
> > like. Once we merge the KIP-25 patch, Confluent will be running the tests
> > regularly and results will be available publicly so we'll be able to keep
> > better tabs on performance, albeit for only a specific class of hardware.
> >
> > For the batch.size question -- I'm not sure the results in the blog post
> > actually have different settings, it could be accidental divergence
> between
> > the script and the blog post. The post specifically notes that tuning the
> > batch size in the synchronous case might help, but that he didn't do
> that.
> > If you're trying to benchmark the *optimal* throughput, tuning the batch
> > size would make sense. Since synchronous replication will have higher
> > latency and there's a limit to how many requests can be in flight at
> once,
> > you'll want a larger batch size to compensate for the additional latency.
> > However, in practice the increase you see may be negligible. Somebody who
> > has spent more time fiddling with tweaking producer performance may have
> > more insight.
> >
> > -Ewen
> >
> > On Mon, Jul 13, 2015 at 10:08 AM, JIEFU GONG  wrote:
> >
> > > Hi all,
> > >
> > > I was wondering if any of you guys have done benchmarks on Kafka
> > > performance before, and if they or their details (# nodes in cluster, #
> > > records / size(s) of messages, etc.) could be shared.
> > >
> > > For comparison purposes, I am trying to benchmark Kafka against some
> > > similar services such as Kinesis or Scribe. Additionally, I was
> wondering
> > > if anyone could shed some insight on Jay Kreps' benchmarks that he has
> > > openly published here:
> > >
> > >
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-milli

Re: kafka benchmark tests

2015-07-14 Thread Yuheng Du
Does anyone know what is bootstrap.servers=
esv4-hcl198.grid.linkedin.com:9092 means in the following test command:

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test7 5000 100 -1 acks=1 bootstrap.servers=
esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196?

what is bootstrap.servers? Is it the kafka server that I am running a test
at?

Thanks.

Yuheng




On Tue, Jul 14, 2015 at 12:18 AM, Ewen Cheslack-Postava 
wrote:

> I implemented (nearly) the same basic set of tests in the system test
> framework we started at Confluent and that is going to move into Kafka --
> see the wip patch for KIP-25 here: https://github.com/apache/kafka/pull/70
> In particular, that test is implemented in benchmark_test.py:
>
> https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8
>
> Hopefully once that's merged people can reuse that benchmark (and add to
> it!) so they can easily run the same benchmarks across different hardware.
> Here are some results from an older version of that test on m3.2xlarge
> instances on EC2 using local ephemeral storage (I think... it's been awhile
> since I ran these numbers and I didn't document methodology that
> carefully):
>
> INFO:_.KafkaBenchmark:=
> INFO:_.KafkaBenchmark:BENCHMARK RESULTS
> INFO:_.KafkaBenchmark:=
> INFO:_.KafkaBenchmark:Single producer, no replication: 684097.470208
> rec/sec (65.24 MB/s)
> INFO:_.KafkaBenchmark:Single producer, async 3x replication:
> 667494.359673 rec/sec (63.66 MB/s)
> INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
> 116485.764275 rec/sec (11.11 MB/s)
> INFO:_.KafkaBenchmark:Three producers, async 3x replication:
> 1696519.022182 rec/sec (161.79 MB/s)
> INFO:_.KafkaBenchmark:Message size:
> INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
> INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
> INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
> INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
> INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
> INFO:_.KafkaBenchmark:Throughput over long run, data > memory:
> INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec (65.30 MB/s)
> INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec (56.830500
> MB/s)
> INFO:_.KafkaBenchmark:Three consumers: 3304011.014900 rec/sec (267.830800
> MB/s)
> INFO:_.KafkaBenchmark:Producer + consumer:
> INFO:_.KafkaBenchmark: Producer: 624984.375391 rec/sec (59.60 MB/s)
> INFO:_.KafkaBenchmark: Consumer: 624984.375391 rec/sec (59.60 MB/s)
> INFO:_.KafkaBenchmark:End-to-end latency: median 2.00 ms, 99%
> 4.00 ms, 99.9% 19.00 ms
>
> Don't trust these numbers for anything, the were a quick one-off test. I'm
> just pasting the output so you get some idea of what the results might look
> like. Once we merge the KIP-25 patch, Confluent will be running the tests
> regularly and results will be available publicly so we'll be able to keep
> better tabs on performance, albeit for only a specific class of hardware.
>
> For the batch.size question -- I'm not sure the results in the blog post
> actually have different settings, it could be accidental divergence between
> the script and the blog post. The post specifically notes that tuning the
> batch size in the synchronous case might help, but that he didn't do that.
> If you're trying to benchmark the *optimal* throughput, tuning the batch
> size would make sense. Since synchronous replication will have higher
> latency and there's a limit to how many requests can be in flight at once,
> you'll want a larger batch size to compensate for the additional latency.
> However, in practice the increase you see may be negligible. Somebody who
> has spent more time fiddling with tweaking producer performance may have
> more insight.
>
> -Ewen
>
> On Mon, Jul 13, 2015 at 10:08 AM, JIEFU GONG  wrote:
>
> > Hi all,
> >
> > I was wondering if any of you guys have done benchmarks on Kafka
> > performance before, and if they or their details (# nodes in cluster, #
> > records / size(s) of messages, etc.) could be shared.
> >
> > For comparison purposes, I am trying to benchmark Kafka against some
> > similar services such as Kinesis or Scribe. Additionally, I was wondering
> > if anyone could shed some insight on Jay Kreps' benchmarks that he has
> > openly published here:
> >
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> >
> > Specifically, I am unsure of why between his tests of 3x synchronous
> > replication and 3x async replication he changed the batch.size, as well
> as
> > why he is seemingly publishing to incorrect topics:
> >
> > Configs:
> > https://gist.github.com/jkreps/c7ddb4041ef62a900e6c
> >
> > Any help is greatly appreciated!
> >
> >
> >
> > --
> >
> > Jiefu Gong
> > University of California, Berkeley 

Re: Using Kafka as a persistent store

2015-07-14 Thread Shayne S
Thanks, I'm on 0.8.2 so that explains it.

Should retention.ms affect segment rolling? In my experiment it did (
retention.ms = -1), which was unexpected since I thought only segment.bytes
and segment.ms would control that.

On Mon, Jul 13, 2015 at 7:57 PM, Daniel Tamai 
wrote:

> Using -1 for log.retention.ms should work only for 0.8.3 (
> https://issues.apache.org/jira/browse/KAFKA-1990).
>
> 2015-07-13 17:08 GMT-03:00 Shayne S :
>
> > Did this work for you? I set the topic settings to retention.ms=-1 and
> > retention.bytes=-1 and it looks like it is deleting segments immediately.
> >
> > On Sun, Jul 12, 2015 at 8:02 AM, Daniel Schierbeck <
> > daniel.schierb...@gmail.com> wrote:
> >
> > >
> > > > On 10. jul. 2015, at 23.03, Jay Kreps  wrote:
> > > >
> > > > If I recall correctly, setting log.retention.ms and
> > log.retention.bytes
> > > to
> > > > -1 disables both.
> > >
> > > Thanks!
> > >
> > > >
> > > > On Fri, Jul 10, 2015 at 1:55 PM, Daniel Schierbeck <
> > > > daniel.schierb...@gmail.com> wrote:
> > > >
> > > >>
> > > >>> On 10. jul. 2015, at 15.16, Shayne S 
> wrote:
> > > >>>
> > > >>> There are two ways you can configure your topics, log compaction
> and
> > > with
> > > >>> no cleaning. The choice depends on your use case. Are the records
> > > >> uniquely
> > > >>> identifiable and will they receive updates? Then log compaction is
> > the
> > > >> way
> > > >>> to go. If they are truly read only, you can go without log
> > compaction.
> > > >>
> > > >> I'd rather be free to use the key for partitioning, and the records
> > are
> > > >> immutable — they're event records — so disabling compaction
> altogether
> > > >> would be preferable. How is that accomplished?
> > > >>>
> > > >>> We have a small processes which consume a topic and perform upserts
> > to
> > > >> our
> > > >>> various database engines. It's easy to change how it all works and
> > > simply
> > > >>> consume the single source of truth again.
> > > >>>
> > > >>> I've written a bit about log compaction here:
> > > >>>
> > >
> http://www.shayne.me/blog/2015/2015-06-25-everything-about-kafka-part-2/
> > > >>>
> > > >>> On Fri, Jul 10, 2015 at 3:46 AM, Daniel Schierbeck <
> > > >>> daniel.schierb...@gmail.com> wrote:
> > > >>>
> > >  I'd like to use Kafka as a persistent store – sort of as an
> > > alternative
> > > >> to
> > >  HDFS. The idea is that I'd load the data into various other
> systems
> > in
> > >  order to solve specific needs such as full-text search, analytics,
> > > >> indexing
> > >  by various attributes, etc. I'd like to keep a single source of
> > truth,
> > >  however.
> > > 
> > >  I'm struggling a bit to understand how I can configure a topic to
> > > retain
> > >  messages indefinitely. I want to make sure that my data isn't
> > deleted.
> > > >> Is
> > >  there a guide to configuring Kafka like this?
> > > >>
> > >
> >
>


Partition rebalancing does not work well for high level consumer

2015-07-14 Thread Stefan Miklosovic
Hi,

I sent this as JIRA here (1) but no response so far.

I tried it with 0.8.3 instead of 0.8.1.1 and the behaviour seems to be the same.

I think that the main problem is that I am instantiating consumers
very quickly one after another in a loop an it seems that the
automatic rebalancing does not work properly in this case.

I tried to play with

partition.assignement.strategy both roundrobin and range
rebalance.max.retries
rebalance.backoff.ms

but non of these seems to work well.

My scenario:

5 producers, 1 topic with 10 partitions, on the consumer side I am
calling Consumers.createJavaConsumer() 10 time, each time with
topicMap("mytopic", 1).

The result is that I am consuming messages only but 4 or 5 consumers
instead of 10. I managed to have 7 consumers once and they were
assigned 3,2,1,1,1,1,1 partitions each.

So, messages are indeed fetched from all partitions and they are
processed correctly but I fail to understand why automatic rebalancing
does not assign partitions to remaining free consumers in case some
consumer has more then 1 partition assigned.

I really like the idea of automatic rebalancing but I want to commit
offsets per partition and this is only achieved by having 1 consumer
thread per consumer connector each thread holding 1 partition.

I would be even satisfied with committing to more then 1 partition in
case some consumer fails and its parititions are migrated to another
consumer however I can not even have all 10 consumers busy in the
first place.

Why partitions are not spread evenly accross all consumers?

(1) https://issues.apache.org/jira/browse/KAFKA-2331

-- 
Stefan Miklosovic


Re: Kafka as an event store for Event Sourcing

2015-07-14 Thread Daniel Schierbeck
Great! I'd love to see this move forward, especially if the design allows
for per-key conditionals sometime in the future – doesn't have to be in the
first iteration.

On Tue, Jul 14, 2015 at 5:26 AM Ben Kirwin  wrote:

> Ah, just saw this. I actually just submitted a patch this evening --
> just for the partitionwide version at the moment, since it turns out
> to be pretty simple to implement. Still very interested in moving
> forward with this stuff, though not always as much time as I would
> like...
>
> On Thu, Jul 9, 2015 at 9:39 AM, Daniel Schierbeck
>  wrote:
> > Ben, are you still interested in working on this?
> >
> > On Mon, Jun 15, 2015 at 9:49 AM Daniel Schierbeck <
> > daniel.schierb...@gmail.com> wrote:
> >
> >> I like to refer to it as "conditional write" or "conditional request",
> >> semantically similar to HTTP's If-Match header.
> >>
> >> Ben: I'm adding a comment about per-key checking to your JIRA.
> >>
> >> On Mon, Jun 15, 2015 at 4:06 AM Ben Kirwin  wrote:
> >>
> >>> Yeah, it's definitely not a standard CAS, but it feels like the right
> >>> fit for the commit log abstraction -- CAS on a 'current value' does
> >>> seem a bit too key-value-store-ish for Kafka to support natively.
> >>>
> >>> I tried to avoid referring to the check-offset-before-publish
> >>> functionality as a CAS in the ticket because, while they're both types
> >>> of 'optimistic concurrency control', they are a bit different -- and
> >>> the offset check is both easier to implement and handier for the stuff
> >>> I tend to work on. (Though that ticket's about checking the latest
> >>> offset on a whole partition, not the key -- there's a different set of
> >>> tradeoffs for the latter, and I haven't thought it through properly
> >>> yet.)
> >>>
> >>> On Sat, Jun 13, 2015 at 3:35 PM, Ewen Cheslack-Postava
> >>>  wrote:
> >>> > If you do CAS where you compare the offset of the current record for
> the
> >>> > key, then yes. This might work fine for applications that track key,
> >>> value,
> >>> > and offset. It is not quite the same as doing a normal CAS.
> >>> >
> >>> > On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck <
> >>> > daniel.schierb...@gmail.com> wrote:
> >>> >
> >>> >> But wouldn't the key->offset table be enough to accept or reject a
> >>> write?
> >>> >> I'm not familiar with the exact implementation of Kafka, so I may be
> >>> wrong.
> >>> >>
> >>> >> On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava <
> >>> e...@confluent.io>
> >>> >> wrote:
> >>> >>
> >>> >> > Daniel: By random read, I meant not reading the data sequentially
> as
> >>> is
> >>> >> the
> >>> >> > norm in Kafka, not necessarily a random disk seek. That in-memory
> >>> data
> >>> >> > structure is what enables the random read. You're either going to
> >>> need
> >>> >> the
> >>> >> > disk seek if the data isn't in the fs cache or you're trading
> memory
> >>> to
> >>> >> > avoid it. If it's a full index containing keys and values then
> you're
> >>> >> > potentially committing to a much larger JVM memory footprint (and
> >>> all the
> >>> >> > GC issues that come with it) since you'd be storing that data in
> the
> >>> JVM
> >>> >> > heap. If you're only storing the keys + offset info, then you
> >>> potentially
> >>> >> > introduce random disk seeks on any CAS operation (and making page
> >>> caching
> >>> >> > harder for the OS, etc.).
> >>> >> >
> >>> >> >
> >>> >> > On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck <
> >>> >> > daniel.schierb...@gmail.com> wrote:
> >>> >> >
> >>> >> > > Ewen: would single-key CAS necessitate random reads? My idea
> was to
> >>> >> have
> >>> >> > > the broker maintain an in-memory table that could be rebuilt
> from
> >>> the
> >>> >> log
> >>> >> > > or a snapshot.
> >>> >> > > On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava <
> >>> >> e...@confluent.io>
> >>> >> > > wrote:
> >>> >> > >
> >>> >> > > > Jay - I think you need broker support if you want CAS to work
> >>> with
> >>> >> > > > compacted topics. With the approach you described you can't
> turn
> >>> on
> >>> >> > > > compaction since that would make it last-writer-wins, and
> using
> >>> any
> >>> >> > > > non-infinite retention policy would require some external
> >>> process to
> >>> >> > > > monitor keys that might expire and refresh them by rewriting
> the
> >>> >> data.
> >>> >> > > >
> >>> >> > > > That said, I think any addition like this warrants a lot of
> >>> >> discussion
> >>> >> > > > about potential use cases since there are a lot of ways you
> >>> could go
> >>> >> > > adding
> >>> >> > > > support for something like this. I think this is an obvious
> next
> >>> >> > > > incremental step, but someone is bound to have a use case that
> >>> would
> >>> >> > > > require multi-key CAS and would be costly to build atop single
> >>> key
> >>> >> CAS.
> >>> >> > > Or,
> >>> >> > > > since the compare requires a random read anyway, why not
> throw in
> >>> >> > > > read-by-key rather than sequential log reads, which wo