Adding replicas to existing topic cause data loss in some partitions

2014-11-06 Thread Shangan Chen
I have a kafka cluster, every topic in it has only one replica. Recently I
extend every topic with 2 replicas. Most topics work fine, but some large
topics have some problems with part of partitions. Consumer throw offset
OutOfRange exception, the fact is consumer request offset is bigger than
the latest offset. I doubt there is some bug with the tool which can add
replicas to existing topic.

I add replicas by the following  guide:
http://kafka.apache.org/081/documentation.html#basic_ops_increase_replication_factor


Does anyone  face the same problem before and can figure out how to avoid
this ?

-- 
have a good day!
chenshang'an


Re: Announcing Confluent

2014-11-06 Thread pankaj ojha
Best of Luck..keep rocking...

On Fri, Nov 7, 2014 at 1:30 AM, Joe Brown  wrote:

> Best of luck!!!
>
> J
>
> On 6 Nov 2014, at 18:28, Jay Kreps  wrote:
>
> > Hey all,
> >
> > I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> > company around Kafka called Confluent. We are planning on productizing
> the
> > kind of Kafka-based real-time data platform we built out at LinkedIn. We
> > are doing this because we think this is a really powerful idea and we
> felt
> > there was a lot to do to make this idea really take root. We wanted to
> make
> > that our full time mission and focus.
> >
> > There is a blog post that goes into a little more depth here:
> > http://blog.confluent.io/
> >
> > LinkedIn will remain a heavy Kafka user and contributor. Combined with
> our
> > additional resources from the funding of the company this should be a
> > really good thing for the Kafka development effort. Especially when
> > combined with the increasing contributions from the rest of the
> development
> > community. This is great news, as there is a lot of work to do. We'll
> need
> > to really focus on scaling this distributed development in a healthy way.
> >
> > One thing I do want to emphasize is that the addition of a company in the
> > Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> > 100% open source and community focused, as of course is true of any
> Apache
> > project. I have been doing open source for a long time and strongly
> believe
> > it is the right model for infrastructure software development.
> >
> > Confluent is just getting off the ground now. We left LinkedIn, raised
> some
> > money, and we have an office (but no furniture yet!). None the less, f
> you
> > are interested in finding out more about the company and either getting
> > help with your Kafka usage or joining us to help build all this, by all
> > means reach out to us, we’d love to talk.
> >
> > Wish us luck!
> >
> > -Jay
>
>


-- 
Thanks,
Pankaj Ojha


Re: Cannot connect to Kafka from outside of EC2

2014-11-06 Thread Guozhang Wang
Sameer,

The server logs do not contain any non-INFO logs, which is a bit wired. Did
you deploy the current trunk of Kafka? Also could you enable DEBUG level
logging on Kafka brokers?

Guozhang

On Wed, Nov 5, 2014 at 3:50 PM, Sameer Yami  wrote:

> The server.log was taken separately.
> We ran the test again and the server and producer logs are below (to get
> same timings).
>
>
> Thanks!
>
>
> 
>
>
>
> Producer Logs -
>
>
> 2014-11-05 23:38:58,693
> Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for sessionid:
> 0x1498251e8680002 after 0ms
> 2014-11-05 23:39:00,695
> Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for sessionid:
> 0x1498251e8680002 after 0ms
> 2014-11-05 23:39:02,696
> Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for sessionid:
> 0x1498251e8680002 after 0ms
> 2014-11-05 23:39:02,828 pool-13-thread-2   INFO
> kafka.utils.VerifiableProperties-68: Verifying properties
> 2014-11-05 23:39:02,829 pool-13-thread-2   INFO
> kafka.utils.VerifiableProperties-68: Property auto.commit.interval.ms is
> overridden to 1000
> 2014-11-05 23:39:02,829 pool-13-thread-2   INFO
> kafka.utils.VerifiableProperties-68: Property auto.offset.reset is
> overridden to smallest
> 2014-11-05 23:39:02,829 pool-13-thread-2   INFO
> kafka.utils.VerifiableProperties-68: Property consumer.timeout.ms is
> overridden to 10
> 2014-11-05 23:39:02,829 pool-13-thread-2   INFO
> kafka.utils.VerifiableProperties-68: Property group.id is overridden to
> TestCheck
> 2014-11-05 23:39:02,830 pool-13-thread-2   WARN
> kafka.utils.VerifiableProperties-83: Property serializer.class is not valid
> 2014-11-05 23:39:02,830 pool-13-thread-2   INFO
> kafka.utils.VerifiableProperties-68: Property zookeeper.connect is
> overridden to 172.31.25.198:2181
> 2014-11-05 23:39:02,831 pool-13-thread-2   INFO
> kafka.consumer.ZookeeperConsumerConnector-68:
> [TestCheck_ip-172-31-25-198-1415230742830-f3dfc362], Connecting to
> zookeeper instance at 172.31.25.198:2181
> 2014-11-05 23:39:02,831 pool-13-thread-2  DEBUG
> org.I0Itec.zkclient.ZkConnection-63: Creating new ZookKeeper instance to
> connect to 172.31.25.198:2181.
> 2014-11-05 23:39:02,831 pool-13-thread-2   INFO
> org.apache.zookeeper.ZooKeeper-379: Initiating client connection,
> connectString=172.31.25.198:2181 sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@3903b165
> 2014-11-05 23:39:02,831 ZkClient-EventThread-29-172.31.25.198:2181   INFO
> org.I0Itec.zkclient.ZkEventThread-64: Starting ZkClient event thread.
> 2014-11-05 23:39:02,831 pool-13-thread-1   INFO
> kafka.utils.VerifiableProperties-68: Verifying properties
> 2014-11-05 23:39:02,836 pool-13-thread-2-SendThread()   INFO
> org.apache.zookeeper.ClientCnxn-1061: Opening socket connection to server /
> 172.31.25.198:2181
> 2014-11-05 23:39:02,836 pool-13-thread-1   WARN
> kafka.utils.VerifiableProperties-83: Property batch.size is not valid
> 2014-11-05 23:39:02,832 pool-13-thread-2  DEBUG
> org.I0Itec.zkclient.ZkClient-878: Awaiting connection to Zookeeper server
> 2014-11-05 23:39:02,836 pool-13-thread-1   INFO
> kafka.utils.VerifiableProperties-68: Property message.send.max.retries is
> overridden to 10
> 2014-11-05 23:39:02,836 pool-13-thread-2  DEBUG
> org.I0Itec.zkclient.ZkClient-628: Waiting for keeper state SyncConnected
> 2014-11-05 23:39:02,837 pool-13-thread-1   INFO
> kafka.utils.VerifiableProperties-68: Property metadata.broker.list is
> overridden to 172.31.25.198:9092
> 2014-11-05 23:39:02,837 pool-13-thread-1   INFO
> kafka.utils.VerifiableProperties-68: Property retry.backoff.ms is
> overridden to 1000
> 2014-11-05 23:39:02,837 pool-13-thread-1   INFO
> kafka.utils.VerifiableProperties-68: Property serializer.class is
> overridden to kafka.serializer.StringEncoder
> 2014-11-05 23:39:02,837
>
> pool-13-thread-2-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> INFO org.apache.zookeeper.ClientCnxn-950: Socket connection established to
> ip-172-31-25-198.us-west-1.compute.internal/172.31.25.198:2181, initiating
> session
> 2014-11-05 23:39:02,838
>
> pool-13-thread-2-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> DEBUG org.apache.zookeeper.ClientCnxn-999: Session establishment request
> sent on ip-172-31-25-198.us-west-1.compute.internal/172.31.25.198:2181
> 2014-11-05 23:39:02,837 pool-13-thread-1   WARN
> kafka.utils.VerifiableProperties-83: Property zk.connectiontimeout.ms is
> not valid
> 2014-11-05 23:39:02,841
>
> pool-13-thread-2-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> INFO org.apache.zookeeper.ClientCnxn-739: Session establishment complete on
> server ip-172-31-25-198.us-west-1.compute.internal/172.31.25.198:2181,

Re: consumer ack for high-level consumer?

2014-11-06 Thread Guozhang Wang
0. Yes, if consumer crashed before commit its offset it can cause
duplicates.

1. Yes, since from the consumer client's point of view, once the message is
returned from the iterator it is considered as "consumed"; if you want
consumer to only consider a message as consumed when it is processed by the
application on top of it, you need to turn off auto offset and manually
call commit.

On Thu, Nov 6, 2014 at 6:25 PM, Chia-Chun Shih 
wrote:

> Hi,
>
> Thanks for your response. Therefore, offsets in ZK may be out-of-date. It
> is possible to deliver duplicated messages when clients restart.
>
> I also wonder the possibilities of losing message. Is it possible that
> things occur in this order?
>
>1. Client calls ConsumerIterator$next() to get a message, update local
>offsets
>2. ZookeeperConsumerConnector$commitOffset() is called, local offsets
>sync to ZK
>3. Client fails when processing this message
>4. Client restarts, but this message is marked as consumed in ZK
>
> Thanks,
> Chia-Chun
>
> 2014-11-07 1:45 GMT+08:00 Guozhang Wang :
>
> > That is correct.
> >
> > Guozhang
> >
> > On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih 
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for your response. I just read source code and found that:
> > >
> > >   1) ConsumerIterator$next() use PartitionTopicInfo$resetConsumeOffset
> to
> > > update offsets in PartitionTopicInfo objects.
> > >   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets from
> > > PartitionTopicInfo objects, and update offsets to ZK.
> > >
> > > So, when clients iterate through messages, offsets are updated locally
> > > in PartitionTopicInfo
> > > objects. When ZookeeperConsumerConnector$commitOffset is called, local
> > > offsets are sync to ZK. Is it correct?
> > >
> > > regards,
> > > Chia-Chun
> > >
> > > 2014-11-06 0:24 GMT+08:00 Guozhang Wang :
> > >
> > > > Hello,
> > > >
> > > > You can turn of auto.commit.offset and manually call
> > > > connector.commitOffset() manually after you have processed the data.
> > One
> > > > thing to remember is that the commit frequency is related to ZK (in
> the
> > > > future, Kafka) writes and hence you may not want to commit after
> > > processed
> > > > every single message but only a batch of messages.
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <
> > chiachun.s...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am a new to Kafka. In my understanding, high-level consumer (
> > > > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > > > by ConsumerIterator. But I would like to change offset when message
> > is
> > > > > processed, not when message is drawn from broker. So if a consumer
> > dies
> > > > > before a message is completely processed, the message will be
> > processed
> > > > > again. Is it possible?
> > > > >
> > > > > Thanks.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: corrupt recovery checkpoint file issue....

2014-11-06 Thread Jason Rosenberg
filed: https://issues.apache.org/jira/browse/KAFKA-1758

On Thu, Nov 6, 2014 at 11:50 PM, Jason Rosenberg  wrote:

> I'm still not sure what caused the reboot of the system (but yes it
> appears to have crashed hard).  The file system is xfs, on CentOs linux.
> I'm not yet sure, but I think also before the crash, the system might have
> become wedged.
>
> It appears the corrupt recovery files actually contained all zero bytes,
> after looking at it with odb.
>
> I'll file a Jira.
>
> On Thu, Nov 6, 2014 at 7:09 PM, Jun Rao  wrote:
>
>> I am also wondering how the corruption happened. The way that we update
>> the
>> OffsetCheckpoint file is to first write to a tmp file and flush the data.
>> We then rename the tmp file to the final file. This is done to prevent
>> corruption caused by a crash in the middle of the writes. In your case,
>> was
>> the host crashed? What kind of storage system are you using? Is there any
>> non-volatile cache on the storage system?
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg  wrote:
>>
>> > Hi,
>> >
>> > We recently had a kafka node go down suddenly. When it came back up, it
>> > apparently had a corrupt recovery file, and refused to startup:
>> >
>> > 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
>> > starting up KafkaServer
>> > java.lang.NumberFormatException: For input string:
>> >
>> >
>> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
>> >
>> >
>> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
>> > at
>> >
>> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>> > at java.lang.Integer.parseInt(Integer.java:481)
>> > at java.lang.Integer.parseInt(Integer.java:527)
>> > at
>> > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
>> > at
>> scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
>> > at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
>> > at
>> > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
>> > at
>> > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
>> > at
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> > at
>> > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>> > at kafka.log.LogManager.loadLogs(LogManager.scala:105)
>> > at kafka.log.LogManager.(LogManager.scala:57)
>> > at
>> kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
>> > at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
>> >
>> > And since the app is under a monitor (so it was repeatedly restarting
>> and
>> > failing with this error for several minutes before we got to it)…
>> >
>> > We moved the ‘recovery-point-offset-checkpoint’ file out of the way,
>> and it
>> > then restarted cleanly (but of course re-synced all it’s data from
>> > replicas, so we had no data loss).
>> >
>> > Anyway, I’m wondering if that’s the expected behavior? Or should it not
>> > declare it corrupt and then proceed automatically to an unclean restart?
>> >
>> > Should this NumberFormatException be handled a bit more gracefully?
>> >
>> > We saved the corrupt file if it’s worth inspecting (although I doubt it
>> > will be useful!)….
>> >
>> > Jason
>> > ​
>> >
>>
>
>


Re: corrupt recovery checkpoint file issue....

2014-11-06 Thread Jason Rosenberg
I'm still not sure what caused the reboot of the system (but yes it appears
to have crashed hard).  The file system is xfs, on CentOs linux.  I'm not
yet sure, but I think also before the crash, the system might have become
wedged.

It appears the corrupt recovery files actually contained all zero bytes,
after looking at it with odb.

I'll file a Jira.

On Thu, Nov 6, 2014 at 7:09 PM, Jun Rao  wrote:

> I am also wondering how the corruption happened. The way that we update the
> OffsetCheckpoint file is to first write to a tmp file and flush the data.
> We then rename the tmp file to the final file. This is done to prevent
> corruption caused by a crash in the middle of the writes. In your case, was
> the host crashed? What kind of storage system are you using? Is there any
> non-volatile cache on the storage system?
>
> Thanks,
>
> Jun
>
> On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg  wrote:
>
> > Hi,
> >
> > We recently had a kafka node go down suddenly. When it came back up, it
> > apparently had a corrupt recovery file, and refused to startup:
> >
> > 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
> > starting up KafkaServer
> > java.lang.NumberFormatException: For input string:
> >
> >
> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
> >
> >
> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
> > at
> >
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> > at java.lang.Integer.parseInt(Integer.java:481)
> > at java.lang.Integer.parseInt(Integer.java:527)
> > at
> > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> > at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> > at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
> > at
> > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
> > at
> > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> > at
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> > at
> > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> > at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> > at kafka.log.LogManager.(LogManager.scala:57)
> > at
> kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> > at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
> >
> > And since the app is under a monitor (so it was repeatedly restarting and
> > failing with this error for several minutes before we got to it)…
> >
> > We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and
> it
> > then restarted cleanly (but of course re-synced all it’s data from
> > replicas, so we had no data loss).
> >
> > Anyway, I’m wondering if that’s the expected behavior? Or should it not
> > declare it corrupt and then proceed automatically to an unclean restart?
> >
> > Should this NumberFormatException be handled a bit more gracefully?
> >
> > We saved the corrupt file if it’s worth inspecting (although I doubt it
> > will be useful!)….
> >
> > Jason
> > ​
> >
>


Re: nulls found in topic, created by recovery?

2014-11-06 Thread Neil Harkins
The topic is not compressed. The consumer used our fork of the python lib,
which I had to modify to get over the nulls.

-neil


On Thu, Nov 6, 2014 at 2:16 PM, Neha Narkhede  wrote:
> IIRC, the bug that introduced the nulls was related to compressed data. Is
> this topic compressed? Did you try to run a consumer through the topic's
> data or alternately the DumpLogSegments tool?
>
> On Thu, Nov 6, 2014 at 12:56 PM, Neil Harkins  wrote:
>>
>> Hi all. I saw something weird yesterday on our "leaf" instances
>> which run kafka 0.7.2 (and mirror to kafka 0.8 via our custom code).
>> I fully realize everyone's instinctual response is "upgrade, already.",
>> but I'd like to have an internals discussion to better understand
>> what happened, as I suspect it's still relevant in 0.8.
>>
>> Basically, in one of our topics there was an 8k stretch of nulls.
>> Correlating timestamps from the messages bracketing the nulls
>> to the kafka log, I see that the server restarted during that time,
>> and here are the recovery lines related to the topic with the nulls:
>>
>> [2014-11-04 00:48:07,602] INFO zookeeper state changed (SyncConnected)
>> (org.I0Itec.zkclient.ZkClient)
>> [2014-11-04 01:00:35,806] INFO Shutting down Kafka server
>> (kafka.server.KafkaServer)
>> [2014-11-04 01:00:35,813] INFO shutdown scheduler kafka-logcleaner-
>> (kafka.utils.KafkaScheduler)
>> [2014-11-04 01:01:38,411] INFO Starting Kafka server...
>> (kafka.server.KafkaServer)
>> ...
>> [2014-11-04 01:01:49,146] INFO Loading log 'foo.bar-0'
>> (kafka.log.LogManager)
>> [2014-11-04 01:01:49,147] INFO Loading the last segment
>> /var/kafka-leaf-spool/foo.bar-0/002684355423.kafka in mutable
>> mode, recovery true (kafka.log.Log)
>> [2014-11-04 01:01:55,877] INFO recover high water mark:414004449
>> (kafka.message.FileMessageSet)
>> [2014-11-04 01:01:55,877] INFO Recovery succeeded in 6 seconds. 0
>> bytes truncated. (kafka.message.FileMessageSet)
>>
>> The only hypothesis I can come up with is that the shutdown
>> ("graceful"?) did not wait for all messages to flush to disk
>> (we're configured with: log.flush.interval=1,
>> log.default.flush.interval.ms=500, and
>> log.default.flush.scheduler.interval.ms=500),
>> but the max offset was recorded, so that when it came back up,
>> it filled the gap with nulls to reach the valid max offset in case
>> any consumers were at the end.
>>
>> But for consumers with a position prior to all the nulls,
>> are they guaranteed to get back "on the rails" so-to-speak?
>> Nulls appear as v0(i.e. "magic") messages of 0 length,
>> but the messages replaced could be variable length.
>>
>> Thanks in advance for any input,
>> -neil
>
>


Re: corrupt message

2014-11-06 Thread 马哲超
I've got almost the same error, but also haven't figured out the reason.

2014-11-07 9:05 GMT+08:00 Neha Narkhede :

> This may be due to a bug in the client. Non-java Kafka clients are
> maintained by the individual client owners. You might want to ping the
> owner of your library directly.
>
> On Mon, Nov 3, 2014 at 7:21 AM, Fredrik S Loekke 
> wrote:
>
> >  When running a C# producer against a kafka 0.8.1.1 server running on a
> > virtual linux (virtualbox, Ubuntu) I keep getting the following error:
> >
> >
> >
> > [2014-11-03 15:19:08,595] ERROR [KafkaApi-0] Error processing
> > ProducerRequest with correlation id 601 from client Kafka-Net on
> partition
> > [x,0] (kafka.server.KafkaApis)
> >
> > kafka.message.InvalidMessageException: Message is corrupt (stored crc =
> > 1767811542, computed crc = 1256103753)
> >
> > at kafka.message.Message.ensureValid(Message.scala:166)
> >
> > at
> >
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:330)
> >
> > at
> >
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:318)
> >
> > at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >
> > at
> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> >
> > at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:318)
> >
> > at kafka.log.Log.append(Log.scala:231)
> >
> > at
> > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
> >
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
> >
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
> >
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> >
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> >
> > at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >
> > at
> > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> >
> > at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> >
> > at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> >
> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> >
> > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> >
> > at scala.collection.mutable.HashMap.map(HashMap.scala:45)
> >
> > at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
> >
> > at
> > kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
> >
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
> >
> > at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> >
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> >
> > Any suggestion on how to resolve this issue?
> >
> >
> >
> > Best regards / Med venlig hilsen
> >
> >
> >
> > *Fredrik Skeel Løkke*
> >
> > Software Developer ǀ IT & Analysis
> >
> >
> >
> > Mob.: +45 3176 8438
> >
> > f...@lindcapital.com
> >
> >
> >
> > [image: Beskrivelse: Beskrivelse: Beskrivelse: Beskrivelse:
> > cid:image001.png@01CD4A0C.218B6960]
> >
> >
> >
> > Lind Capital A/S
> >
> > Værkmestergade 3, 2
> >
> > DK-8000 Aarhus C
> >
> > www.lindcapital.com
> >
> > Follow us on
> >
> > ­[image: linkedin] 
> [image:
> > facebook] 
> >
> >
> >
>


Re: consumer ack for high-level consumer?

2014-11-06 Thread Chia-Chun Shih
Hi,

Thanks for your response. Therefore, offsets in ZK may be out-of-date. It
is possible to deliver duplicated messages when clients restart.

I also wonder the possibilities of losing message. Is it possible that
things occur in this order?

   1. Client calls ConsumerIterator$next() to get a message, update local
   offsets
   2. ZookeeperConsumerConnector$commitOffset() is called, local offsets
   sync to ZK
   3. Client fails when processing this message
   4. Client restarts, but this message is marked as consumed in ZK

Thanks,
Chia-Chun

2014-11-07 1:45 GMT+08:00 Guozhang Wang :

> That is correct.
>
> Guozhang
>
> On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih 
> wrote:
>
> > Hi,
> >
> > Thanks for your response. I just read source code and found that:
> >
> >   1) ConsumerIterator$next() use PartitionTopicInfo$resetConsumeOffset to
> > update offsets in PartitionTopicInfo objects.
> >   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets from
> > PartitionTopicInfo objects, and update offsets to ZK.
> >
> > So, when clients iterate through messages, offsets are updated locally
> > in PartitionTopicInfo
> > objects. When ZookeeperConsumerConnector$commitOffset is called, local
> > offsets are sync to ZK. Is it correct?
> >
> > regards,
> > Chia-Chun
> >
> > 2014-11-06 0:24 GMT+08:00 Guozhang Wang :
> >
> > > Hello,
> > >
> > > You can turn of auto.commit.offset and manually call
> > > connector.commitOffset() manually after you have processed the data.
> One
> > > thing to remember is that the commit frequency is related to ZK (in the
> > > future, Kafka) writes and hence you may not want to commit after
> > processed
> > > every single message but only a batch of messages.
> > >
> > > Guozhang
> > >
> > > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <
> chiachun.s...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am a new to Kafka. In my understanding, high-level consumer (
> > > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > > by ConsumerIterator. But I would like to change offset when message
> is
> > > > processed, not when message is drawn from broker. So if a consumer
> dies
> > > > before a message is completely processed, the message will be
> processed
> > > > again. Is it possible?
> > > >
> > > > Thanks.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


OffsetOutOfRange errors

2014-11-06 Thread Jimmy John
Hello,

  I understand what this error means, just not sure why I keep running into
it after 24-48 hrs of running fine consuming > 300 messages / second.

  What happens when a kafka log rolls over and some old records are aged
out? I mean what happens to the offsets? We are using a python client which
stores the offsets in ZK. But in the middle of the run, say after 2 days or
so, suddenly it gets this error.

The only possibility is that the older records have aged off and ZK still
has the offset which is no longer applicable...How does the java client
deal with this? Does kafka inform ZK that records have been aged off and
update the offset or something?

Here is the error i see in the broker logs

[2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing fetch
request for partition [activity.stream,3] offset 8013827 from consumer  with
correlation id 73 (kafka.server.KafkaApis)

 kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but we
only have log segments in the range 8603331 to 11279773.

 at kafka.log.Log.read(Log.scala:380)

 at
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)

 at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)

 at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)

 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)

 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)

 at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)

 at scala.collection.immutable.Map$Map3.map(Map.scala:144)

 at
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)

 at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)

 at kafka.server.KafkaApis.handle(KafkaApis.scala:186)

 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)

 at java.lang.Thread.run(Thread.java:745)


thx

Jim


Re: corrupt message

2014-11-06 Thread Neha Narkhede
This may be due to a bug in the client. Non-java Kafka clients are
maintained by the individual client owners. You might want to ping the
owner of your library directly.

On Mon, Nov 3, 2014 at 7:21 AM, Fredrik S Loekke 
wrote:

>  When running a C# producer against a kafka 0.8.1.1 server running on a
> virtual linux (virtualbox, Ubuntu) I keep getting the following error:
>
>
>
> [2014-11-03 15:19:08,595] ERROR [KafkaApi-0] Error processing
> ProducerRequest with correlation id 601 from client Kafka-Net on partition
> [x,0] (kafka.server.KafkaApis)
>
> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
> 1767811542, computed crc = 1256103753)
>
> at kafka.message.Message.ensureValid(Message.scala:166)
>
> at
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:330)
>
> at
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:318)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>
> at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:318)
>
> at kafka.log.Log.append(Log.scala:231)
>
> at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
>
> at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
>
> at
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>
> at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
>
> at scala.collection.mutable.HashMap.map(HashMap.scala:45)
>
> at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
>
> at
> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
>
> at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
>
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Any suggestion on how to resolve this issue?
>
>
>
> Best regards / Med venlig hilsen
>
>
>
> *Fredrik Skeel Løkke*
>
> Software Developer ǀ IT & Analysis
>
>
>
> Mob.: +45 3176 8438
>
> f...@lindcapital.com
>
>
>
> [image: Beskrivelse: Beskrivelse: Beskrivelse: Beskrivelse:
> cid:image001.png@01CD4A0C.218B6960]
>
>
>
> Lind Capital A/S
>
> Værkmestergade 3, 2
>
> DK-8000 Aarhus C
>
> www.lindcapital.com
>
> Follow us on
>
> ­[image: linkedin]   [image:
> facebook] 
>
>
>


Re: Kafka Cluster disaster decovery

2014-11-06 Thread Neha Narkhede
A common solution for disaster recovery is to mirror the Kafka cluster into
another one deployed in a separate data center. The mirroring is not
synchronous so there might be some message loss when you lose the entire
cluster in some disaster.

Thanks,
Neha

On Mon, Nov 3, 2014 at 7:43 AM, Guozhang Wang  wrote:

> Yingkai,
>
> Kafka uses persistent storage so the data written to it will not be lost,
> you just need to restart the cluster. But during the down time it will
> become un-available.
>
> Guozhang
>
>
>
> On Fri, Oct 31, 2014 at 2:06 PM, Yingkai Hu  wrote:
>
> > Hi All,
> >
> > I’m new to Kafka, please direct me to the right path if it is a duplicate
> > question.
> >
> > Basically I deployed Kafka to a 4 machine cluster, what if the whole
> > cluster went down, does kafka provide any backup/restore mechanism?
> Please
> > advise.
> >
> > Thanks!
> > Yingkai
>
>
>
>
> --
> -- Guozhang
>


Re: Issue with async producer

2014-11-06 Thread Jun Rao
In 0.8.2 beta, we have a new java producer that's more performing. You can
give it a try.

Thanks,

Jun

On Thu, Nov 6, 2014 at 10:16 AM, Devendra Tagare <
devendra.tag...@pubmatic.com> wrote:

> Hi,
>
> Which version of Kafka are you using?
> 0.8.0
>
> Is the broker I/O or network saturated? - typical writes are around 45 MB
> per broker & there are streaming as well as batch consumers.
>
> We are already sending across compressed packets & the batch size is 100.
>
> Also, we tried the sync producer but were not able to enforce the timeouts
> on it.Is there anything that needs to be taken care of in this case?
>
> Regards,
> Dev
> 
> From: Devendra Tagare
> Sent: Tuesday, November 04, 2014 10:02 AM
> To: users@kafka.apache.org
> Cc: Devendra Tagare
> Subject: Issue with async producer
>
> Hi,
>
> We are using an async producer to send data to kafka.
>
> The load the sender handles is around 250 rps ,the size of a message is
> around 25K.
>
> The configs used in the producer are :
>
> request.required.acks=0
> producer.type=async
> batch.num.messages=10
> topic.metadata.refresh.interval.ms=3
> queue.buffering.max.ms=300
> queue.enqueue.timeout.ms=50
>
>
> While the asnc producer works  perfectly fine  at 150-175 rps,the invoking
> method returns under 10 ms.The invoking method takes around 2ms to
> return when the load increases to 250rps.
>
> On investigation we noticed QueueFullExceptions in the logs.
>
> On the kafka side the memory utilization was high.Is is because the fsync
> to memory is not happening fast enough?
> The document says the OS level fsync interval should take care of the
> interval at which writes are happening.Should we expediate the write using
>
> log.flush.interval.messages
> and log.flush.interval.ms.
>
> Also, we tried the sync producer but were not able to enforce the timeouts
> on it.
>
> We have 45 producers & 11 Kafka servers which handle a load of around
> 500mn events per day.
>
> Some of the server side properties we are using:
>
> log.segment.bytes=536870912
> log.retention.check.interval.ms=6
> zookeeper.connection.timeout.ms=100
>
> Regards,
> Dev
>


Re: corrupt recovery checkpoint file issue....

2014-11-06 Thread Jun Rao
I am also wondering how the corruption happened. The way that we update the
OffsetCheckpoint file is to first write to a tmp file and flush the data.
We then rename the tmp file to the final file. This is done to prevent
corruption caused by a crash in the middle of the writes. In your case, was
the host crashed? What kind of storage system are you using? Is there any
non-volatile cache on the storage system?

Thanks,

Jun

On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg  wrote:

> Hi,
>
> We recently had a kafka node go down suddenly. When it came back up, it
> apparently had a corrupt recovery file, and refused to startup:
>
> 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
> starting up KafkaServer
> java.lang.NumberFormatException: For input string:
>
> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
>
> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:481)
> at java.lang.Integer.parseInt(Integer.java:527)
> at
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
> at
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
> at
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> at kafka.log.LogManager.(LogManager.scala:57)
> at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
>
> And since the app is under a monitor (so it was repeatedly restarting and
> failing with this error for several minutes before we got to it)…
>
> We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it
> then restarted cleanly (but of course re-synced all it’s data from
> replicas, so we had no data loss).
>
> Anyway, I’m wondering if that’s the expected behavior? Or should it not
> declare it corrupt and then proceed automatically to an unclean restart?
>
> Should this NumberFormatException be handled a bit more gracefully?
>
> We saved the corrupt file if it’s worth inspecting (although I doubt it
> will be useful!)….
>
> Jason
> ​
>


Re: Consumer lag keep increasing

2014-11-06 Thread Neha Narkhede
Chen,

Consumers lag either due to an I/O or network bottleneck or due to slow
processing of messages by the user. To confirm that you are not hitting the
latter issue, you can run a console consumer on the same data and observe
the throughput that it provides and it's lag.

Thanks,
Neha

On Wed, Nov 5, 2014 at 3:31 PM, Chen Wang 
wrote:

> Guozhang,
> I can see message keep coming, meaning messages are being consumed, right?
> But the lag is pretty huge (average 30m messages behind) as you can see
> from the graph:
>
> https://www.dropbox.com/s/xli25zicxv5f2qa/Screenshot%202014-11-05%2015.23.05.png?dl=0
>
> My understanding is that for such light weight thread, the consumer should
> almost be at the same pace with the producer. I also checked the machine
> metrics, and nothing pegged there.
>
> I am also moving the testing application to a separate dev cluster. In your
> experience, what things might cause the slow reading? Is this more like a
> server side thing, or consumer side?
>
> Chen
>
> On Wed, Nov 5, 2014 at 3:10 PM, Guozhang Wang  wrote:
>
> > Chen,
> >
> > Your configs seems fine.
> >
> > Could you use ConsumerOffsetChecker tool to see if the offset is
> advancing
> > at all (i.e. messages are comsumed), and if yes get some thread dumps and
> > check if your consumer is blocked on some locks?
> >
> > Guozhang
> >
> > On Wed, Nov 5, 2014 at 2:01 PM, Chen Wang 
> > wrote:
> >
> > > Hey Guys,
> > > I have a really simply storm topology with a kafka spout, reading from
> > > kafka through high level consumer. Since the topic has 30 partitions,
> we
> > > have 30 threads in the spout reading from it. However, it seems that
> the
> > > lag keeps increasing even the thread only read the message and do
> > nothing.
> > > The largest message size  are around 30KB, and the incoming rate can be
> > as
> > > hight as 14k/seconds. There are 3 brokers on some high config bare
> metal
> > > machines. The client side config is like this:
> > >
> > > kafka.config.fetch.message.max.bytes3145728
> > > kafka.config.group.id   spout_readonly
> > > kafka.config.rebalance.backoff.ms   6000
> > > kafka.config.rebalance.max.retries  6
> > > kafka.config.zookeeper.connect  dare-broker00.sv.walmartlabs.com:2181,
> > > dare-broker01.sv.walmartlabs.com:2181,
> > > dare-broker02.sv.walmartlabs.com:2181
> > > kafka.config.zookeeper.session.timeout.ms   6
> > >
> > > what could possibly cause this huge lag? Will broker be a bottle neck,
> or
> > > some config need to be adjusted? The server side config is like this:
> > >
> > > replica.fetch.max.bytes=2097152
> > > message.max.bytes=2097152
> > > num.network.threads=4
> > > num.io.threads=4
> > >
> > > # The send buffer (SO_SNDBUF) used by the socket server
> > > socket.send.buffer.bytes=4194304
> > >
> > > # The receive buffer (SO_RCVBUF) used by the socket server
> > > socket.receive.buffer.bytes=2097152
> > >
> > > # The maximum size of a request that the socket server will accept
> > > (protection against OOM)
> > > socket.request.max.bytes=104857600
> > >
> > > Any help appreciated!
> > > Chen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: kafka test jars in sbt?

2014-11-06 Thread Jun Rao
The following is how samza references the kafka test jar in gradle.

testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"

Thanks,

Jun


On Thu, Nov 6, 2014 at 6:38 AM, Markus Jais  wrote:

> Hello,
>
> I want to use the kafka_2.10-0.8.2-beta-test.jar in my Scala project.
>
> It can be found here:
> http://repo.maven.apache.org/maven2/org/apache/kafka/kafka_2.10/0.8.1.1/
>
>
> In my build.sbt I write the following definition:
> "org.apache.kafka" % "kafka_2.10" % "0.8.2-beta-test"
>
>
> But sbt cannot find it. Has anybody has any success with this?
>
> I already used "gradle testJar" and it test jar gets published to :
>
> .m2/repository/org/apache/kafka/kafka_2.10/0.8.2-beta
>
>
> but sbt is looking for a:
>
> .m2/repository/org/apache/kafka/kafka_2.10/0.8.2-beta-test/kafka_2.10-0.8.2-beta-test.pom
>
> any tips on how to use the kafka test jar (together with the regular kafka
> jar) in an build.sbt file?
>
> I want to start a kafka cluster for a unit test.
>
> Cheers,
>
> Marus


Re: Storing data in kafka keys

2014-11-06 Thread Jun Rao
The keyed messages are typically used in two cases (1) you want messages
with the same key to be in the same partition and therefore those messages
with the same key will be consumed by the same consumer instance; (2) you
want to enable the log compaction feature for retention such that the
broker will over time garbage collect all messages with the same key,
except for the most recent message.

Thanks,

Jun

On Wed, Nov 5, 2014 at 10:15 AM, Ivan Balashov  wrote:

> Hi,
>
> It looks like it is a general practice to avoid storing data in kafka
> keys. Some examples of this: Camus, Secor both not using keys. Even
> such a swiss-army tool as kafkacat doesn't seem to have the ability to
> display key (although I might be wrong). Also, console producer does
> not display keys by default, which makes it confusing in initial quick
> checks for data.
>
> What's the story behind this? Should one think twice before tying
> business data to kafka keys?
>
> Thanks,
>


Re: Producer and Consumer properties

2014-11-06 Thread Jun Rao
For higher throughput, you want to configure the producer with a higher
batch size. You may also want to enable compression.

Thanks,

Jun

On Wed, Nov 5, 2014 at 6:46 AM, Eduardo Costa Alfaia  wrote:

> Hi Dudes,
>
> I would like to know if the producer and consumer’s properties files into
> the config folder should be configured. I have configured only the
> server.properties, is it enough? I am doing some tests about the
> performance, for example network throughput my scenario is:
>
> Like producer I am using this program in c:
>
>
> Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Location of Logging Files/How To Turn On Logging For Kafka Components

2014-11-06 Thread Jun Rao
The log4j entries before that error should tell you the cause of the error.

Thanks,

Jun

On Tue, Nov 4, 2014 at 11:25 PM, Alex Melville  wrote:

> Background:
>
> I have searched for a while online, and through the files located in the
> kafka/logs directory, trying to find where kafka writes log output to in
> order to debug the SimpleProducer I wrote. My producer is almost identical
> to the simple producer located here
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
>
> except for I'm using Protobuffers instead of Strings to publish data to a
> cluster. I'm receiving the following error when I try to run the
> SimpleProducer
>
> Exception in thread "main" kafka.common.FailedToSendMessageException:
> Failed to send messages after 3 tries.
>
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>
> at kafka.producer.Producer.send(Producer.scala:76)
>
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
>
> at stream.SimpleProducer.send(Unknown Source)
>
> at stream.SimpleProducer.main(Unknown Source)
>
>
> I know this isn't a network problem, because I ran the console-producer and
> successfully published data to the same broker that my Simple Producer is
> trying to publish to. I now want to try to debug this error.
>
>
>
> Question:
>
> Where would my Simple Producer write info about its startup and eventual
> error, such that I can read it and try to reason as to why it failed? If it
> produces no log data on its own, what is the best way to write this data to
> a somewhere where I can use it to debug? I've noticed that log4j, which I
> understand is a often-used library for logging in Java, came with my kafka
> download. Am I supposed to use log4j for this info? I do not know very much
> about log4j, so any info on how to get this setup would also be
> appreciated.
>
>
> -Alex
>


Re: High CPU usage of Crc32 on Kafka broker

2014-11-06 Thread Jay Kreps
I suspect it is possible to save and reuse the CRCs though it might be a
bit of an invasive change. I suspect the first usage is when we are
checking the validity of the messages and the second is from when we
rebuild the compressed message set (I'm assuming you guys are using
compression because I think we optimize this out otherwise). Technically I
think the CRCs stay the same.

An alternative approach, though, would be working to remove the need for
recompression entirely on the broker side by making the offsets in the
compressed message relative to the base offset of the message set. This is
a much more invasive change but potentially better as it would also remove
the recompression done on the broker which is also CPU heavy.

-Jay

On Thu, Nov 6, 2014 at 2:36 PM, Allen Wang 
wrote:

> Sure. Here is the link to the screen shot of jmc with the JTR file loaded:
>
> http://picpaste.com/fligh-recorder-crc.png
>
>
>
> On Thu, Nov 6, 2014 at 2:12 PM, Neha Narkhede 
> wrote:
>
> > Allen,
> >
> > Apache mailing lists don't allow attachments. Could you please link to a
> > pastebin or something?
> >
> > Thanks,
> > Neha
> >
> > On Thu, Nov 6, 2014 at 12:02 PM, Allen Wang 
> > wrote:
> >
> > > After digging more into the stack trace got from flight recorder (which
> > is
> > > attached), it seems that Kafka (0.8.1.1) can optimize the usage of
> Crc32.
> > > The stack trace shows that Crc32 is invoked twice from Log.append().
> > First
> > > is from the line number 231:
> > >
> > > val appendInfo = analyzeAndValidateMessageSet(messages)
> > >
> > > The second time is from line 252 in the same function:
> > >
> > > validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
> > >
> > > If one of the Crc32 invocation can be eliminated, we are looking at
> > saving
> > > at least 7% of CPU usage.
> > >
> > > Thanks,
> > > Allen
> > >
> > >
> > >
> > >
> > > On Wed, Nov 5, 2014 at 6:32 PM, Allen Wang  wrote:
> > >
> > >> Hi,
> > >>
> > >> Using flight recorder, we have observed high CPU usage of CRC32
> > >> (kafka.utils.Crc32.update()) on Kafka broker. It uses as much as 25%
> of
> > CPU
> > >> on an instance. Tracking down stack trace, this method is invoked by
> > >> ReplicaFetcherThread.
> > >>
> > >> Is there any tuning we can do to reduce this?
> > >>
> > >> Also on the topic of CPU utilization, we observed that overall CPU
> > >> utilization is proportional to AllTopicsBytesInPerSec metric. Does
> this
> > >> metric include incoming replication traffic?
> > >>
> > >> Thanks,
> > >> Allen
> > >>
> > >>
> > >
> >
>


Re: Kafka Release timelines

2014-11-06 Thread Neha Narkhede
0.8.2 should be available in a month. Though 0.9 might take a couple more
months and there is a good chance that it will not be this year.

Thanks,
Neha

On Thu, Nov 6, 2014 at 3:01 AM, dinesh kumar  wrote:

> Hi,
> I found the future release plan wiki here
>  .
> I
> see that 0.8.2 is still in beta even though it was stated for September.
> What is the expected date for 0.9 release?
>
> Thanks,
> Dinesh
>


Re: High CPU usage of Crc32 on Kafka broker

2014-11-06 Thread Allen Wang
Sure. Here is the link to the screen shot of jmc with the JTR file loaded:

http://picpaste.com/fligh-recorder-crc.png



On Thu, Nov 6, 2014 at 2:12 PM, Neha Narkhede 
wrote:

> Allen,
>
> Apache mailing lists don't allow attachments. Could you please link to a
> pastebin or something?
>
> Thanks,
> Neha
>
> On Thu, Nov 6, 2014 at 12:02 PM, Allen Wang 
> wrote:
>
> > After digging more into the stack trace got from flight recorder (which
> is
> > attached), it seems that Kafka (0.8.1.1) can optimize the usage of Crc32.
> > The stack trace shows that Crc32 is invoked twice from Log.append().
> First
> > is from the line number 231:
> >
> > val appendInfo = analyzeAndValidateMessageSet(messages)
> >
> > The second time is from line 252 in the same function:
> >
> > validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
> >
> > If one of the Crc32 invocation can be eliminated, we are looking at
> saving
> > at least 7% of CPU usage.
> >
> > Thanks,
> > Allen
> >
> >
> >
> >
> > On Wed, Nov 5, 2014 at 6:32 PM, Allen Wang  wrote:
> >
> >> Hi,
> >>
> >> Using flight recorder, we have observed high CPU usage of CRC32
> >> (kafka.utils.Crc32.update()) on Kafka broker. It uses as much as 25% of
> CPU
> >> on an instance. Tracking down stack trace, this method is invoked by
> >> ReplicaFetcherThread.
> >>
> >> Is there any tuning we can do to reduce this?
> >>
> >> Also on the topic of CPU utilization, we observed that overall CPU
> >> utilization is proportional to AllTopicsBytesInPerSec metric. Does this
> >> metric include incoming replication traffic?
> >>
> >> Thanks,
> >> Allen
> >>
> >>
> >
>


Re: Interaction of retention settings for broker and topic plus partitions

2014-11-06 Thread Neha Narkhede
To clarify though, is it correct that a per topic limit will always
override the default limit of the same type?  (e.g. a large per-topic
retention hours vs. a small default retention hours)?

That's correct.

On Thu, Nov 6, 2014 at 9:34 AM, Jason Rosenberg  wrote:

> Jun,
>
> To clarify though, is it correct that a per topic limit will always
> override the default limit of the same type?  (e.g. a large per-topic
> retention hours vs. a small default retention hours)?
>
> Jason
>
> On Sat, Sep 20, 2014 at 12:28 AM, Jun Rao  wrote:
>
> > That's right. The rule is that a log segment is deleted if either the
> size
> > or the time limit is reached. Log sizes are per partition.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Sep 18, 2014 at 2:55 PM, Cory Watson  wrote:
> >
> > > Hello all!
> > >
> > > I'm curious about the interaction of server and topic level retention
> > > settings. It's not clear to me the precedence of the follow:
> > >
> > >- broker's default log.retention.bytes
> > >- topic's retention.bytes (which defaults to broker's
> > >log.retention.bytes)
> > >- broker's log.retention.hours and log.retention.minutes (if both
> are
> > >specified then it seems to be the lower of the two, since it's when
> > >"either" is exceeded)
> > >
> > > It seems that the rule is that when any of these are violated then the
> > log
> > > segment is deleted. Is this right?
> > >
> > > Also, just to be clear: The log sizes in questions are for a single
> > > partitions logs?
> > >
> > > I have a situation where my per-topic retention.bytes is very high, but
> > my
> > > default log.retention.hours is lower (the default @ 168 hours). It
> seems
> > > that it's truncating at the log.retention.hours instead of the topic's
> > > retention.bytes.
> > >
> > > Am I understanding this correctly? :)
> > >
> > > --
> > > Cory Watson
> > > Principal Infrastructure Engineer // Keen IO
> > >
> >
>


Re: nulls found in topic, created by recovery?

2014-11-06 Thread Neha Narkhede
IIRC, the bug that introduced the nulls was related to compressed data. Is
this topic compressed? Did you try to run a consumer through the topic's
data or alternately the DumpLogSegments tool?

On Thu, Nov 6, 2014 at 12:56 PM, Neil Harkins  wrote:

> Hi all. I saw something weird yesterday on our "leaf" instances
> which run kafka 0.7.2 (and mirror to kafka 0.8 via our custom code).
> I fully realize everyone's instinctual response is "upgrade, already.",
> but I'd like to have an internals discussion to better understand
> what happened, as I suspect it's still relevant in 0.8.
>
> Basically, in one of our topics there was an 8k stretch of nulls.
> Correlating timestamps from the messages bracketing the nulls
> to the kafka log, I see that the server restarted during that time,
> and here are the recovery lines related to the topic with the nulls:
>
> [2014-11-04 00:48:07,602] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2014-11-04 01:00:35,806] INFO Shutting down Kafka server
> (kafka.server.KafkaServer)
> [2014-11-04 01:00:35,813] INFO shutdown scheduler kafka-logcleaner-
> (kafka.utils.KafkaScheduler)
> [2014-11-04 01:01:38,411] INFO Starting Kafka server...
> (kafka.server.KafkaServer)
> ...
> [2014-11-04 01:01:49,146] INFO Loading log 'foo.bar-0'
> (kafka.log.LogManager)
> [2014-11-04 01:01:49,147] INFO Loading the last segment
> /var/kafka-leaf-spool/foo.bar-0/002684355423.kafka in mutable
> mode, recovery true (kafka.log.Log)
> [2014-11-04 01:01:55,877] INFO recover high water mark:414004449
> (kafka.message.FileMessageSet)
> [2014-11-04 01:01:55,877] INFO Recovery succeeded in 6 seconds. 0
> bytes truncated. (kafka.message.FileMessageSet)
>
> The only hypothesis I can come up with is that the shutdown
> ("graceful"?) did not wait for all messages to flush to disk
> (we're configured with: log.flush.interval=1,
> log.default.flush.interval.ms=500, and
> log.default.flush.scheduler.interval.ms=500),
> but the max offset was recorded, so that when it came back up,
> it filled the gap with nulls to reach the valid max offset in case
> any consumers were at the end.
>
> But for consumers with a position prior to all the nulls,
> are they guaranteed to get back "on the rails" so-to-speak?
> Nulls appear as v0(i.e. "magic") messages of 0 length,
> but the messages replaced could be variable length.
>
> Thanks in advance for any input,
> -neil
>


Re: High CPU usage of Crc32 on Kafka broker

2014-11-06 Thread Neha Narkhede
Allen,

Apache mailing lists don't allow attachments. Could you please link to a
pastebin or something?

Thanks,
Neha

On Thu, Nov 6, 2014 at 12:02 PM, Allen Wang 
wrote:

> After digging more into the stack trace got from flight recorder (which is
> attached), it seems that Kafka (0.8.1.1) can optimize the usage of Crc32.
> The stack trace shows that Crc32 is invoked twice from Log.append(). First
> is from the line number 231:
>
> val appendInfo = analyzeAndValidateMessageSet(messages)
>
> The second time is from line 252 in the same function:
>
> validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
>
> If one of the Crc32 invocation can be eliminated, we are looking at saving
> at least 7% of CPU usage.
>
> Thanks,
> Allen
>
>
>
>
> On Wed, Nov 5, 2014 at 6:32 PM, Allen Wang  wrote:
>
>> Hi,
>>
>> Using flight recorder, we have observed high CPU usage of CRC32
>> (kafka.utils.Crc32.update()) on Kafka broker. It uses as much as 25% of CPU
>> on an instance. Tracking down stack trace, this method is invoked by
>> ReplicaFetcherThread.
>>
>> Is there any tuning we can do to reduce this?
>>
>> Also on the topic of CPU utilization, we observed that overall CPU
>> utilization is proportional to AllTopicsBytesInPerSec metric. Does this
>> metric include incoming replication traffic?
>>
>> Thanks,
>> Allen
>>
>>
>


Re: Strategies for high-concurrency consumers

2014-11-06 Thread Jack Foy
On Nov 6, 2014, at 11:43 AM, Neha Narkhede  wrote:
> Zookeeper is likely the bottleneck if rebalancing takes a very long time.
> As Jay said, this will be addressed in the consumer rewrite planned for
> 0.9. Few more workarounds that were tried at LinkedIn - 1) To deploy
> Zookeeper on SSDs and 2) Turning sync on every write off
> (zookeeper.forceSync). I'm not sure if #2 negatively affected the
> consistency of the zookeeper data ever but it did help with speeding up the
> rebalancing.

OK, thanks very much for the guidance.

-- 
Jack Foy 





nulls found in topic, created by recovery?

2014-11-06 Thread Neil Harkins
Hi all. I saw something weird yesterday on our "leaf" instances
which run kafka 0.7.2 (and mirror to kafka 0.8 via our custom code).
I fully realize everyone's instinctual response is "upgrade, already.",
but I'd like to have an internals discussion to better understand
what happened, as I suspect it's still relevant in 0.8.

Basically, in one of our topics there was an 8k stretch of nulls.
Correlating timestamps from the messages bracketing the nulls
to the kafka log, I see that the server restarted during that time,
and here are the recovery lines related to the topic with the nulls:

[2014-11-04 00:48:07,602] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
[2014-11-04 01:00:35,806] INFO Shutting down Kafka server
(kafka.server.KafkaServer)
[2014-11-04 01:00:35,813] INFO shutdown scheduler kafka-logcleaner-
(kafka.utils.KafkaScheduler)
[2014-11-04 01:01:38,411] INFO Starting Kafka server...
(kafka.server.KafkaServer)
...
[2014-11-04 01:01:49,146] INFO Loading log 'foo.bar-0' (kafka.log.LogManager)
[2014-11-04 01:01:49,147] INFO Loading the last segment
/var/kafka-leaf-spool/foo.bar-0/002684355423.kafka in mutable
mode, recovery true (kafka.log.Log)
[2014-11-04 01:01:55,877] INFO recover high water mark:414004449
(kafka.message.FileMessageSet)
[2014-11-04 01:01:55,877] INFO Recovery succeeded in 6 seconds. 0
bytes truncated. (kafka.message.FileMessageSet)

The only hypothesis I can come up with is that the shutdown
("graceful"?) did not wait for all messages to flush to disk
(we're configured with: log.flush.interval=1,
log.default.flush.interval.ms=500, and
log.default.flush.scheduler.interval.ms=500),
but the max offset was recorded, so that when it came back up,
it filled the gap with nulls to reach the valid max offset in case
any consumers were at the end.

But for consumers with a position prior to all the nulls,
are they guaranteed to get back "on the rails" so-to-speak?
Nulls appear as v0(i.e. "magic") messages of 0 length,
but the messages replaced could be variable length.

Thanks in advance for any input,
-neil


Re: Announcing Confluent

2014-11-06 Thread Joe Brown
Best of luck!!!

J

On 6 Nov 2014, at 18:28, Jay Kreps  wrote:

> Hey all,
> 
> I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> company around Kafka called Confluent. We are planning on productizing the
> kind of Kafka-based real-time data platform we built out at LinkedIn. We
> are doing this because we think this is a really powerful idea and we felt
> there was a lot to do to make this idea really take root. We wanted to make
> that our full time mission and focus.
> 
> There is a blog post that goes into a little more depth here:
> http://blog.confluent.io/
> 
> LinkedIn will remain a heavy Kafka user and contributor. Combined with our
> additional resources from the funding of the company this should be a
> really good thing for the Kafka development effort. Especially when
> combined with the increasing contributions from the rest of the development
> community. This is great news, as there is a lot of work to do. We'll need
> to really focus on scaling this distributed development in a healthy way.
> 
> One thing I do want to emphasize is that the addition of a company in the
> Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> 100% open source and community focused, as of course is true of any Apache
> project. I have been doing open source for a long time and strongly believe
> it is the right model for infrastructure software development.
> 
> Confluent is just getting off the ground now. We left LinkedIn, raised some
> money, and we have an office (but no furniture yet!). None the less, f you
> are interested in finding out more about the company and either getting
> help with your Kafka usage or joining us to help build all this, by all
> means reach out to us, we’d love to talk.
> 
> Wish us luck!
> 
> -Jay



Re: High CPU usage of Crc32 on Kafka broker

2014-11-06 Thread Allen Wang
After digging more into the stack trace got from flight recorder (which is
attached), it seems that Kafka (0.8.1.1) can optimize the usage of Crc32.
The stack trace shows that Crc32 is invoked twice from Log.append(). First
is from the line number 231:

val appendInfo = analyzeAndValidateMessageSet(messages)

The second time is from line 252 in the same function:

validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

If one of the Crc32 invocation can be eliminated, we are looking at saving
at least 7% of CPU usage.

Thanks,
Allen




On Wed, Nov 5, 2014 at 6:32 PM, Allen Wang  wrote:

> Hi,
>
> Using flight recorder, we have observed high CPU usage of CRC32
> (kafka.utils.Crc32.update()) on Kafka broker. It uses as much as 25% of CPU
> on an instance. Tracking down stack trace, this method is invoked by
> ReplicaFetcherThread.
>
> Is there any tuning we can do to reduce this?
>
> Also on the topic of CPU utilization, we observed that overall CPU
> utilization is proportional to AllTopicsBytesInPerSec metric. Does this
> metric include incoming replication traffic?
>
> Thanks,
> Allen
>
>


Re: Announcing Confluent

2014-11-06 Thread Steve Morin
Jay, Neha and Jun congratz!!

On Thu, Nov 6, 2014 at 11:09 AM, Bhavesh Mistry 
wrote:

> HI Guys,
>
> Thanks for your awesome support.  I wish you good luck !!   Thanks for open
> sources Kafka !!
>
> Thanks,
>
> Bhavesh
>
> On Thu, Nov 6, 2014 at 10:52 AM, Rajasekar Elango 
> wrote:
>
> > Congrats. Wish you all the very best and success.
> >
> > Thanks,
> > Raja.
> >
> > On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders 
> > wrote:
> >
> > > Congrats!
> > >
> > > On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps 
> wrote:
> > > > Hey all,
> > > >
> > > > I’m happy to announce that Jun Rao, Neha Narkhede and I are creating
> a
> > > > company around Kafka called Confluent. We are planning on
> productizing
> > > the
> > > > kind of Kafka-based real-time data platform we built out at LinkedIn.
> > We
> > > > are doing this because we think this is a really powerful idea and we
> > > felt
> > > > there was a lot to do to make this idea really take root. We wanted
> to
> > > make
> > > > that our full time mission and focus.
> > > >
> > > > There is a blog post that goes into a little more depth here:
> > > > http://blog.confluent.io/
> > > >
> > > > LinkedIn will remain a heavy Kafka user and contributor. Combined
> with
> > > our
> > > > additional resources from the funding of the company this should be a
> > > > really good thing for the Kafka development effort. Especially when
> > > > combined with the increasing contributions from the rest of the
> > > development
> > > > community. This is great news, as there is a lot of work to do. We'll
> > > need
> > > > to really focus on scaling this distributed development in a healthy
> > way.
> > > >
> > > > One thing I do want to emphasize is that the addition of a company in
> > the
> > > > Kafka ecosystem won’t mean meddling with open source. Kafka will
> remain
> > > > 100% open source and community focused, as of course is true of any
> > > Apache
> > > > project. I have been doing open source for a long time and strongly
> > > believe
> > > > it is the right model for infrastructure software development.
> > > >
> > > > Confluent is just getting off the ground now. We left LinkedIn,
> raised
> > > some
> > > > money, and we have an office (but no furniture yet!). None the less,
> f
> > > you
> > > > are interested in finding out more about the company and either
> getting
> > > > help with your Kafka usage or joining us to help build all this, by
> all
> > > > means reach out to us, we’d love to talk.
> > > >
> > > > Wish us luck!
> > > >
> > > > -Jay
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>


Re: Strategies for high-concurrency consumers

2014-11-06 Thread Neha Narkhede
Jack,

Zookeeper is likely the bottleneck if rebalancing takes a very long time.
As Jay said, this will be addressed in the consumer rewrite planned for
0.9. Few more workarounds that were tried at LinkedIn - 1) To deploy
Zookeeper on SSDs and 2) Turning sync on every write off
(zookeeper.forceSync). I'm not sure if #2 negatively affected the
consistency of the zookeeper data ever but it did help with speeding up the
rebalancing.

THanks,
Neha

On Thu, Nov 6, 2014 at 11:31 AM, Jay Kreps  wrote:

> Unfortunately the performance of the consumer balancing scales poorly with
> the number of partitions. This is one of the things the consumer rewrite
> project is meant to address, however that is not complete yet. A reasonable
> workaround may be to decouple your application parallelism from the number
> of partitions. I.e. have the processing of each partition happen in a
> threadpool. I'm assuming that you don't actually have 2,500 machines, just
> that you need that much parallelism since each messages takes a bit of time
> to process. This does weaken the delivery ordering, but you may be able to
> shard the processing by key to solve that problem.
>
> -Jay
>
> On Thu, Nov 6, 2014 at 10:59 AM, Jack Foy  wrote:
>
> > Hi all,
> >
> > We are building a system that will carry a high volume of traffic (on the
> > order of 2 billion messages in each batch), which we need to process at a
> > rate of 50,000 messages per second. We need to guarantee at-least-once
> > delivery for each message. The system we are feeding has a latency of
> 50ms
> > per message, and can absorb many concurrent requests.
> >
> > We have a Kafka 0.8.1.1 cluster with three brokers and a Zookeeper 3.4.5
> > cluster with 5 nodes, each on physical hardware.
> >
> > We intend to deploy a consumer group of 2500 consumers against a single
> > topic, with a partition for each consumer. We expect our consumers to be
> > stable over the course of the run, so we expect rebalancing to be rare.
> In
> > testing, we have successfully run 512 high-level consumers against 1024
> > partitions, but beyond 512 consumers the rebalance at startup doesn’t
> > complete within 10 minutes. Is this a workable strategy with high-level
> > consumers? Can we actually deploy a consumer group with this many
> consumers
> > and partitions?
> >
> > We see throughput of more than 500,000 messages per second with our 512
> > consumers, but we need greater parallelism to meet our performance needs.
> >
> > --
> > Jack Foy 
> >
> >
> >
> >
>


Re: Strategies for high-concurrency consumers

2014-11-06 Thread Jay Kreps
Unfortunately the performance of the consumer balancing scales poorly with
the number of partitions. This is one of the things the consumer rewrite
project is meant to address, however that is not complete yet. A reasonable
workaround may be to decouple your application parallelism from the number
of partitions. I.e. have the processing of each partition happen in a
threadpool. I'm assuming that you don't actually have 2,500 machines, just
that you need that much parallelism since each messages takes a bit of time
to process. This does weaken the delivery ordering, but you may be able to
shard the processing by key to solve that problem.

-Jay

On Thu, Nov 6, 2014 at 10:59 AM, Jack Foy  wrote:

> Hi all,
>
> We are building a system that will carry a high volume of traffic (on the
> order of 2 billion messages in each batch), which we need to process at a
> rate of 50,000 messages per second. We need to guarantee at-least-once
> delivery for each message. The system we are feeding has a latency of 50ms
> per message, and can absorb many concurrent requests.
>
> We have a Kafka 0.8.1.1 cluster with three brokers and a Zookeeper 3.4.5
> cluster with 5 nodes, each on physical hardware.
>
> We intend to deploy a consumer group of 2500 consumers against a single
> topic, with a partition for each consumer. We expect our consumers to be
> stable over the course of the run, so we expect rebalancing to be rare. In
> testing, we have successfully run 512 high-level consumers against 1024
> partitions, but beyond 512 consumers the rebalance at startup doesn’t
> complete within 10 minutes. Is this a workable strategy with high-level
> consumers? Can we actually deploy a consumer group with this many consumers
> and partitions?
>
> We see throughput of more than 500,000 messages per second with our 512
> consumers, but we need greater parallelism to meet our performance needs.
>
> --
> Jack Foy 
>
>
>
>


Re: Announcing Confluent

2014-11-06 Thread Bhavesh Mistry
HI Guys,

Thanks for your awesome support.  I wish you good luck !!   Thanks for open
sources Kafka !!

Thanks,

Bhavesh

On Thu, Nov 6, 2014 at 10:52 AM, Rajasekar Elango 
wrote:

> Congrats. Wish you all the very best and success.
>
> Thanks,
> Raja.
>
> On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders 
> wrote:
>
> > Congrats!
> >
> > On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps  wrote:
> > > Hey all,
> > >
> > > I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> > > company around Kafka called Confluent. We are planning on productizing
> > the
> > > kind of Kafka-based real-time data platform we built out at LinkedIn.
> We
> > > are doing this because we think this is a really powerful idea and we
> > felt
> > > there was a lot to do to make this idea really take root. We wanted to
> > make
> > > that our full time mission and focus.
> > >
> > > There is a blog post that goes into a little more depth here:
> > > http://blog.confluent.io/
> > >
> > > LinkedIn will remain a heavy Kafka user and contributor. Combined with
> > our
> > > additional resources from the funding of the company this should be a
> > > really good thing for the Kafka development effort. Especially when
> > > combined with the increasing contributions from the rest of the
> > development
> > > community. This is great news, as there is a lot of work to do. We'll
> > need
> > > to really focus on scaling this distributed development in a healthy
> way.
> > >
> > > One thing I do want to emphasize is that the addition of a company in
> the
> > > Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> > > 100% open source and community focused, as of course is true of any
> > Apache
> > > project. I have been doing open source for a long time and strongly
> > believe
> > > it is the right model for infrastructure software development.
> > >
> > > Confluent is just getting off the ground now. We left LinkedIn, raised
> > some
> > > money, and we have an office (but no furniture yet!). None the less, f
> > you
> > > are interested in finding out more about the company and either getting
> > > help with your Kafka usage or joining us to help build all this, by all
> > > means reach out to us, we’d love to talk.
> > >
> > > Wish us luck!
> > >
> > > -Jay
> >
>
>
>
> --
> Thanks,
> Raja.
>


Re: No longer supporting Java 6, if? when?

2014-11-06 Thread Jay Kreps
Yeah it is a little bit silly that people are still using Java 6.

I guess this is a tradeoff--being more conservative in our java support
means more people can use our software, whereas upgrading gives us
developers a better experience since we aren't stuck with ancient stuff.

Nonetheless I would argue for being a bit conservative here. Sadly a
shocking number of people are still using Java 6. The Kafka clients get
embedded in applications all over the place, and likely having even one
application not yet upgraded would block adopting the new Kafka version
that dropped java 6 support. So unless there is something in Java 7 we
really really want I think it might be good to hold out a bit.

As an example we dropped java 6 support in Samza and immediately had people
blocked by that, and unlike the Kafka clients, Samza use is pretty
centralized.

-Jay

On Wed, Nov 5, 2014 at 5:32 PM, Joe Stein  wrote:

> This has been coming up in a lot of projects and for other reasons too I
> wanted to kick off the discussion about if/when we end support for Java 6.
> Besides any API we may want to use in >= 7 we also compile our binaries for
> 6 for release currently.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>


Strategies for high-concurrency consumers

2014-11-06 Thread Jack Foy
Hi all,

We are building a system that will carry a high volume of traffic (on the order 
of 2 billion messages in each batch), which we need to process at a rate of 
50,000 messages per second. We need to guarantee at-least-once delivery for 
each message. The system we are feeding has a latency of 50ms per message, and 
can absorb many concurrent requests.

We have a Kafka 0.8.1.1 cluster with three brokers and a Zookeeper 3.4.5 
cluster with 5 nodes, each on physical hardware. 

We intend to deploy a consumer group of 2500 consumers against a single topic, 
with a partition for each consumer. We expect our consumers to be stable over 
the course of the run, so we expect rebalancing to be rare. In testing, we have 
successfully run 512 high-level consumers against 1024 partitions, but beyond 
512 consumers the rebalance at startup doesn’t complete within 10 minutes. Is 
this a workable strategy with high-level consumers? Can we actually deploy a 
consumer group with this many consumers and partitions? 

We see throughput of more than 500,000 messages per second with our 512 
consumers, but we need greater parallelism to meet our performance needs. 

-- 
Jack Foy 





Re: No longer supporting Java 6, if? when?

2014-11-06 Thread Gwen Shapira
Java6 is supported on CDH4 but not CDH5.

On Thu, Nov 6, 2014 at 9:54 AM, Koert Kuipers  wrote:

> when is java 6 dropped by the hadoop distros?
>
> i am still aware of many clusters that are java 6 only at the moment.
>
>
>
> On Thu, Nov 6, 2014 at 12:44 PM, Gwen Shapira 
> wrote:
>
> > +1 for dropping Java 6
> >
> > On Thu, Nov 6, 2014 at 9:31 AM, Steven Schlansker <
> > sschlans...@opentable.com
> > > wrote:
> >
> > > Java 6 has been End of Life since Feb 2013.
> > > Java 7 (and 8, but unfortunately that's too new still) has very
> > compelling
> > > features which can make development a lot easier.
> > >
> > > The sooner more projects drop Java 6 the better, in my opinion :)
> > >
> > > On Nov 5, 2014, at 7:45 PM, Worthy LaFollette 
> wrote:
> > >
> > > > Mostly converted now to 1.7, this would be welcomed to get any new
> > > > features.
> > > >
> > > > On Wed Nov 05 2014 at 7:32:55 PM Joe Stein 
> > wrote:
> > > >
> > > >> This has been coming up in a lot of projects and for other reasons
> > too I
> > > >> wanted to kick off the discussion about if/when we end support for
> > Java
> > > 6.
> > > >> Besides any API we may want to use in >= 7 we also compile our
> > binaries
> > > for
> > > >> 6 for release currently.
> > > >>
> > > >> /***
> > > >> Joe Stein
> > > >> Founder, Principal Consultant
> > > >> Big Data Open Source Security LLC
> > > >> http://www.stealth.ly
> > > >> Twitter: @allthingshadoop 
> > > >> /
> > > >>
> > >
> > >
> >
>


Re: Announcing Confluent

2014-11-06 Thread Rajasekar Elango
Congrats. Wish you all the very best and success.

Thanks,
Raja.

On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders  wrote:

> Congrats!
>
> On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps  wrote:
> > Hey all,
> >
> > I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> > company around Kafka called Confluent. We are planning on productizing
> the
> > kind of Kafka-based real-time data platform we built out at LinkedIn. We
> > are doing this because we think this is a really powerful idea and we
> felt
> > there was a lot to do to make this idea really take root. We wanted to
> make
> > that our full time mission and focus.
> >
> > There is a blog post that goes into a little more depth here:
> > http://blog.confluent.io/
> >
> > LinkedIn will remain a heavy Kafka user and contributor. Combined with
> our
> > additional resources from the funding of the company this should be a
> > really good thing for the Kafka development effort. Especially when
> > combined with the increasing contributions from the rest of the
> development
> > community. This is great news, as there is a lot of work to do. We'll
> need
> > to really focus on scaling this distributed development in a healthy way.
> >
> > One thing I do want to emphasize is that the addition of a company in the
> > Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> > 100% open source and community focused, as of course is true of any
> Apache
> > project. I have been doing open source for a long time and strongly
> believe
> > it is the right model for infrastructure software development.
> >
> > Confluent is just getting off the ground now. We left LinkedIn, raised
> some
> > money, and we have an office (but no furniture yet!). None the less, f
> you
> > are interested in finding out more about the company and either getting
> > help with your Kafka usage or joining us to help build all this, by all
> > means reach out to us, we’d love to talk.
> >
> > Wish us luck!
> >
> > -Jay
>



-- 
Thanks,
Raja.


Re:Announcing Confluent

2014-11-06 Thread Saurabh Agarwal (BLOOMBERG/ 731 LEX -)
Congratulations Jay, Jun and Neha. Great news. Wish you good luck!!!

- Original Message -
From: users@kafka.apache.org
To: d...@kafka.apache.org, users@kafka.apache.org
At: Nov  6 2014 13:28:51

Hey all,

I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
company around Kafka called Confluent. We are planning on productizing the
kind of Kafka-based real-time data platform we built out at LinkedIn. We
are doing this because we think this is a really powerful idea and we felt
there was a lot to do to make this idea really take root. We wanted to make
that our full time mission and focus.

There is a blog post that goes into a little more depth here:
http://blog.confluent.io/

LinkedIn will remain a heavy Kafka user and contributor. Combined with our
additional resources from the funding of the company this should be a
really good thing for the Kafka development effort. Especially when
combined with the increasing contributions from the rest of the development
community. This is great news, as there is a lot of work to do. We'll need
to really focus on scaling this distributed development in a healthy way.

One thing I do want to emphasize is that the addition of a company in the
Kafka ecosystem won’t mean meddling with open source. Kafka will remain
100% open source and community focused, as of course is true of any Apache
project. I have been doing open source for a long time and strongly believe
it is the right model for infrastructure software development.

Confluent is just getting off the ground now. We left LinkedIn, raised some
money, and we have an office (but no furniture yet!). None the less, f you
are interested in finding out more about the company and either getting
help with your Kafka usage or joining us to help build all this, by all
means reach out to us, we’d love to talk.

Wish us luck!

-Jay



Re: Announcing Confluent

2014-11-06 Thread Niek Sanders
Congrats!

On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps  wrote:
> Hey all,
>
> I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> company around Kafka called Confluent. We are planning on productizing the
> kind of Kafka-based real-time data platform we built out at LinkedIn. We
> are doing this because we think this is a really powerful idea and we felt
> there was a lot to do to make this idea really take root. We wanted to make
> that our full time mission and focus.
>
> There is a blog post that goes into a little more depth here:
> http://blog.confluent.io/
>
> LinkedIn will remain a heavy Kafka user and contributor. Combined with our
> additional resources from the funding of the company this should be a
> really good thing for the Kafka development effort. Especially when
> combined with the increasing contributions from the rest of the development
> community. This is great news, as there is a lot of work to do. We'll need
> to really focus on scaling this distributed development in a healthy way.
>
> One thing I do want to emphasize is that the addition of a company in the
> Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> 100% open source and community focused, as of course is true of any Apache
> project. I have been doing open source for a long time and strongly believe
> it is the right model for infrastructure software development.
>
> Confluent is just getting off the ground now. We left LinkedIn, raised some
> money, and we have an office (but no furniture yet!). None the less, f you
> are interested in finding out more about the company and either getting
> help with your Kafka usage or joining us to help build all this, by all
> means reach out to us, we’d love to talk.
>
> Wish us luck!
>
> -Jay


Re: Announcing Confluent

2014-11-06 Thread Scott Clasen
Awesome. Congrats to all of you!

On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps  wrote:

> Hey all,
>
> I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> company around Kafka called Confluent. We are planning on productizing the
> kind of Kafka-based real-time data platform we built out at LinkedIn. We
> are doing this because we think this is a really powerful idea and we felt
> there was a lot to do to make this idea really take root. We wanted to make
> that our full time mission and focus.
>
> There is a blog post that goes into a little more depth here:
> http://blog.confluent.io/
>
> LinkedIn will remain a heavy Kafka user and contributor. Combined with our
> additional resources from the funding of the company this should be a
> really good thing for the Kafka development effort. Especially when
> combined with the increasing contributions from the rest of the development
> community. This is great news, as there is a lot of work to do. We'll need
> to really focus on scaling this distributed development in a healthy way.
>
> One thing I do want to emphasize is that the addition of a company in the
> Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> 100% open source and community focused, as of course is true of any Apache
> project. I have been doing open source for a long time and strongly believe
> it is the right model for infrastructure software development.
>
> Confluent is just getting off the ground now. We left LinkedIn, raised some
> money, and we have an office (but no furniture yet!). None the less, f you
> are interested in finding out more about the company and either getting
> help with your Kafka usage or joining us to help build all this, by all
> means reach out to us, we’d love to talk.
>
> Wish us luck!
>
> -Jay
>


Re: Announcing Confluent

2014-11-06 Thread chetan conikee
Congrats Jay, Neha and Jun.
Look forward to it.

On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps  wrote:

> Hey all,
>
> I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> company around Kafka called Confluent. We are planning on productizing the
> kind of Kafka-based real-time data platform we built out at LinkedIn. We
> are doing this because we think this is a really powerful idea and we felt
> there was a lot to do to make this idea really take root. We wanted to make
> that our full time mission and focus.
>
> There is a blog post that goes into a little more depth here:
> http://blog.confluent.io/
>
> LinkedIn will remain a heavy Kafka user and contributor. Combined with our
> additional resources from the funding of the company this should be a
> really good thing for the Kafka development effort. Especially when
> combined with the increasing contributions from the rest of the development
> community. This is great news, as there is a lot of work to do. We'll need
> to really focus on scaling this distributed development in a healthy way.
>
> One thing I do want to emphasize is that the addition of a company in the
> Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> 100% open source and community focused, as of course is true of any Apache
> project. I have been doing open source for a long time and strongly believe
> it is the right model for infrastructure software development.
>
> Confluent is just getting off the ground now. We left LinkedIn, raised some
> money, and we have an office (but no furniture yet!). None the less, f you
> are interested in finding out more about the company and either getting
> help with your Kafka usage or joining us to help build all this, by all
> means reach out to us, we’d love to talk.
>
> Wish us luck!
>
> -Jay
>


Announcing Confluent

2014-11-06 Thread Jay Kreps
Hey all,

I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
company around Kafka called Confluent. We are planning on productizing the
kind of Kafka-based real-time data platform we built out at LinkedIn. We
are doing this because we think this is a really powerful idea and we felt
there was a lot to do to make this idea really take root. We wanted to make
that our full time mission and focus.

There is a blog post that goes into a little more depth here:
http://blog.confluent.io/

LinkedIn will remain a heavy Kafka user and contributor. Combined with our
additional resources from the funding of the company this should be a
really good thing for the Kafka development effort. Especially when
combined with the increasing contributions from the rest of the development
community. This is great news, as there is a lot of work to do. We'll need
to really focus on scaling this distributed development in a healthy way.

One thing I do want to emphasize is that the addition of a company in the
Kafka ecosystem won’t mean meddling with open source. Kafka will remain
100% open source and community focused, as of course is true of any Apache
project. I have been doing open source for a long time and strongly believe
it is the right model for infrastructure software development.

Confluent is just getting off the ground now. We left LinkedIn, raised some
money, and we have an office (but no furniture yet!). None the less, f you
are interested in finding out more about the company and either getting
help with your Kafka usage or joining us to help build all this, by all
means reach out to us, we’d love to talk.

Wish us luck!

-Jay


RE: Issue with async producer

2014-11-06 Thread Devendra Tagare
Hi,

Which version of Kafka are you using?
0.8.0

Is the broker I/O or network saturated? - typical writes are around 45 MB per 
broker & there are streaming as well as batch consumers.

We are already sending across compressed packets & the batch size is 100.

Also, we tried the sync producer but were not able to enforce the timeouts on 
it.Is there anything that needs to be taken care of in this case?

Regards,
Dev

From: Devendra Tagare
Sent: Tuesday, November 04, 2014 10:02 AM
To: users@kafka.apache.org
Cc: Devendra Tagare
Subject: Issue with async producer

Hi,

We are using an async producer to send data to kafka.

The load the sender handles is around 250 rps ,the size of a message is around 
25K.

The configs used in the producer are :

request.required.acks=0
producer.type=async
batch.num.messages=10
topic.metadata.refresh.interval.ms=3
queue.buffering.max.ms=300
queue.enqueue.timeout.ms=50


While the asnc producer works  perfectly fine  at 150-175 rps,the invoking 
method returns under 10 ms.The invoking method takes around 2ms to return 
when the load increases to 250rps.

On investigation we noticed QueueFullExceptions in the logs.

On the kafka side the memory utilization was high.Is is because the fsync to 
memory is not happening fast enough?
The document says the OS level fsync interval should take care of the interval 
at which writes are happening.Should we expediate the write using

log.flush.interval.messages
and log.flush.interval.ms.

Also, we tried the sync producer but were not able to enforce the timeouts on 
it.

We have 45 producers & 11 Kafka servers which handle a load of around 500mn 
events per day.

Some of the server side properties we are using:

log.segment.bytes=536870912
log.retention.check.interval.ms=6
zookeeper.connection.timeout.ms=100

Regards,
Dev


Re: "metric.reporters" is not working

2014-11-06 Thread Bae, Jae Hyeon
I added the following ugly reflection code and it's working

try {
Field f = ProducerConfig.class.getDeclaredField("config");
f.setAccessible(true);
ConfigDef config = (ConfigDef) f.get(ConfigDef.class);
config.define(ServoReporter.class.getName(),
ConfigDef.Type.CLASS, ConfigDef.Importance.LOW, "");
} catch (Exception e) {
e.printStackTrace();
}

On Wed, Nov 5, 2014 at 10:48 PM, Bae, Jae Hyeon  wrote:

> Hi
>
> When I set up
>
> props.put("metric.reporters",
> Lists.newArrayList(ServoReporter.class.getName()));
>
> I got the following error:
>
> org.apache.kafka.common.config.ConfigException: Unknown configuration
> 'com.netflix.suro.sink.kafka.ServoReporter'
> at
> org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:60)
> at
> org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:91)
> at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:147)
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:105)
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:94)
>
> AbstractConfig.getClass throws an exception because it cannot find any
> definition of ServoReporter.class.getName() but I cannot add custom class
> definition because the class name is not defined in the key set of
> properties.
>
> Do you have any idea?
>
> Thank you
> Best, Jae
>


Re: Consumer and Producer configs

2014-11-06 Thread Guozhang Wang
Hello Eduardo,

If you are using console producer / consumer, you can set the configs in
command line starting them; if you are wiring the clients directly, then
you can set them in a Properties and pass them to the constructor.

Guozhang

On Thu, Nov 6, 2014 at 7:10 AM, Eduardo Costa Alfaia  wrote:

> Hi Guys,
>
> How could I use the Consumer and Producer configs in my Kafka environment?
>
> Thanks
>
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>



-- 
-- Guozhang


Re: No longer supporting Java 6, if? when?

2014-11-06 Thread Koert Kuipers
when is java 6 dropped by the hadoop distros?

i am still aware of many clusters that are java 6 only at the moment.



On Thu, Nov 6, 2014 at 12:44 PM, Gwen Shapira  wrote:

> +1 for dropping Java 6
>
> On Thu, Nov 6, 2014 at 9:31 AM, Steven Schlansker <
> sschlans...@opentable.com
> > wrote:
>
> > Java 6 has been End of Life since Feb 2013.
> > Java 7 (and 8, but unfortunately that's too new still) has very
> compelling
> > features which can make development a lot easier.
> >
> > The sooner more projects drop Java 6 the better, in my opinion :)
> >
> > On Nov 5, 2014, at 7:45 PM, Worthy LaFollette  wrote:
> >
> > > Mostly converted now to 1.7, this would be welcomed to get any new
> > > features.
> > >
> > > On Wed Nov 05 2014 at 7:32:55 PM Joe Stein 
> wrote:
> > >
> > >> This has been coming up in a lot of projects and for other reasons
> too I
> > >> wanted to kick off the discussion about if/when we end support for
> Java
> > 6.
> > >> Besides any API we may want to use in >= 7 we also compile our
> binaries
> > for
> > >> 6 for release currently.
> > >>
> > >> /***
> > >> Joe Stein
> > >> Founder, Principal Consultant
> > >> Big Data Open Source Security LLC
> > >> http://www.stealth.ly
> > >> Twitter: @allthingshadoop 
> > >> /
> > >>
> >
> >
>


Re: corrupt recovery checkpoint file issue....

2014-11-06 Thread Guozhang Wang
Jason,

Yes I agree with you. We should handle this more gracefully as the
checkpoint file dump is not guaranteed atomic. Could you file a JIRA?

Guozhang

On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg  wrote:

> Hi,
>
> We recently had a kafka node go down suddenly. When it came back up, it
> apparently had a corrupt recovery file, and refused to startup:
>
> 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
> starting up KafkaServer
> java.lang.NumberFormatException: For input string:
>
> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
>
> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:481)
> at java.lang.Integer.parseInt(Integer.java:527)
> at
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
> at
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
> at
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> at kafka.log.LogManager.(LogManager.scala:57)
> at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
>
> And since the app is under a monitor (so it was repeatedly restarting and
> failing with this error for several minutes before we got to it)…
>
> We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it
> then restarted cleanly (but of course re-synced all it’s data from
> replicas, so we had no data loss).
>
> Anyway, I’m wondering if that’s the expected behavior? Or should it not
> declare it corrupt and then proceed automatically to an unclean restart?
>
> Should this NumberFormatException be handled a bit more gracefully?
>
> We saved the corrupt file if it’s worth inspecting (although I doubt it
> will be useful!)….
>
> Jason
> ​
>



-- 
-- Guozhang


Re: Spark and Kafka

2014-11-06 Thread Eduardo Costa Alfaia
This is my window:

reduceByKeyAndWindow(
   new Function2() {
@Override
 public Integer call(Integer i1, Integer i2) { return i1 + i2; }
   },
   new Function2() {
 public Integer call(Integer i1, Integer i2) { return i1 - i2; }
   },
   new Duration(60 * 5 * 1000),
   new Duration(1 * 1000)
 );

> On Nov 6, 2014, at 18:37, Gwen Shapira  wrote:
> 
> What's the window size? If the window is around 10 seconds and you are
> sending data at very stable rate, this is expected.
> 
> 
> 
> On Thu, Nov 6, 2014 at 9:32 AM, Eduardo Costa Alfaia > wrote:
> 
>> Hi Guys,
>> 
>> I am doing some tests with Spark Streaming and Kafka, but I have seen
>> something strange, I have modified the JavaKafkaWordCount to use
>> ReducebyKeyandWindow and to print in the screen the accumulated numbers of
>> the words, in the beginning spark works very well in each interaction the
>> numbers of the words increase but after 12 a 13 sec the results repeats
>> continually.
>> 
>> My program producer remain sending the words toward the kafka.
>> 
>> Does anyone have any idea about this?
>> 
>> 
>> ---
>> Time: 1415272266000 ms
>> ---
>> (accompanied
>> them,6)
>> (merrier,5)
>> (it
>> possessed,5)
>> (the
>> treacherous,5)
>> (Quite,12)
>> (offer,273)
>> (rabble,58)
>> (exchanging,16)
>> (Genoa,18)
>> (merchant,41)
>> ...
>> ---
>> Time: 1415272267000 ms
>> ---
>> (accompanied
>> them,12)
>> (merrier,12)
>> (it
>> possessed,12)
>> (the
>> treacherous,11)
>> (Quite,24)
>> (offer,602)
>> (rabble,132)
>> (exchanging,35)
>> (Genoa,36)
>> (merchant,84)
>> ...
>> ---
>> Time: 1415272268000 ms
>> ---
>> (accompanied
>> them,17)
>> (merrier,18)
>> (it
>> possessed,17)
>> (the
>> treacherous,17)
>> (Quite,35)
>> (offer,889)
>> (rabble,192)
>> (the
>> bed,1)
>> (exchanging,51)
>> (Genoa,54)
>> ...
>> ---
>> Time: 1415272269000 ms
>> ---
>> (accompanied
>> them,17)
>> (merrier,18)
>> (it
>> possessed,17)
>> (the
>> treacherous,17)
>> (Quite,35)
>> (offer,889)
>> (rabble,192)
>> (the
>> bed,1)
>> (exchanging,51)
>> (Genoa,54)
>> ...
>> 
>> ---
>> Time: 141527227 ms
>> ---
>> (accompanied
>> them,17)
>> (merrier,18)
>> (it
>> possessed,17)
>> (the
>> treacherous,17)
>> (Quite,35)
>> (offer,889)
>> (rabble,192)
>> (the
>> bed,1)
>> (exchanging,51)
>> (Genoa,54)
>> ...
>> 
>> 
>> --
>> Informativa sulla Privacy: http://www.unibs.it/node/8155
>> 


-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: consumer ack for high-level consumer?

2014-11-06 Thread Guozhang Wang
That is correct.

Guozhang

On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih 
wrote:

> Hi,
>
> Thanks for your response. I just read source code and found that:
>
>   1) ConsumerIterator$next() use PartitionTopicInfo$resetConsumeOffset to
> update offsets in PartitionTopicInfo objects.
>   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets from
> PartitionTopicInfo objects, and update offsets to ZK.
>
> So, when clients iterate through messages, offsets are updated locally
> in PartitionTopicInfo
> objects. When ZookeeperConsumerConnector$commitOffset is called, local
> offsets are sync to ZK. Is it correct?
>
> regards,
> Chia-Chun
>
> 2014-11-06 0:24 GMT+08:00 Guozhang Wang :
>
> > Hello,
> >
> > You can turn of auto.commit.offset and manually call
> > connector.commitOffset() manually after you have processed the data. One
> > thing to remember is that the commit frequency is related to ZK (in the
> > future, Kafka) writes and hence you may not want to commit after
> processed
> > every single message but only a batch of messages.
> >
> > Guozhang
> >
> > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih  >
> > wrote:
> >
> > > Hi,
> > >
> > > I am a new to Kafka. In my understanding, high-level consumer (
> > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > by ConsumerIterator. But I would like to change offset when message is
> > > processed, not when message is drawn from broker. So if a consumer dies
> > > before a message is completely processed, the message will be processed
> > > again. Is it possible?
> > >
> > > Thanks.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: No longer supporting Java 6, if? when?

2014-11-06 Thread Gwen Shapira
+1 for dropping Java 6

On Thu, Nov 6, 2014 at 9:31 AM, Steven Schlansker  wrote:

> Java 6 has been End of Life since Feb 2013.
> Java 7 (and 8, but unfortunately that's too new still) has very compelling
> features which can make development a lot easier.
>
> The sooner more projects drop Java 6 the better, in my opinion :)
>
> On Nov 5, 2014, at 7:45 PM, Worthy LaFollette  wrote:
>
> > Mostly converted now to 1.7, this would be welcomed to get any new
> > features.
> >
> > On Wed Nov 05 2014 at 7:32:55 PM Joe Stein  wrote:
> >
> >> This has been coming up in a lot of projects and for other reasons too I
> >> wanted to kick off the discussion about if/when we end support for Java
> 6.
> >> Besides any API we may want to use in >= 7 we also compile our binaries
> for
> >> 6 for release currently.
> >>
> >> /***
> >> Joe Stein
> >> Founder, Principal Consultant
> >> Big Data Open Source Security LLC
> >> http://www.stealth.ly
> >> Twitter: @allthingshadoop 
> >> /
> >>
>
>


Re: OffsetOutOfRange Error

2014-11-06 Thread Guozhang Wang
Jimmy,

I am not very familiar with the python java client, you may directly ask
its author:

https://cwiki.apache.org/confluence/display/KAFKA/Clients

On Thu, Nov 6, 2014 at 7:57 AM, Jimmy John  wrote:

> I dug deeper and saw this during normal operation:
>
> In the kafka broker log:
>
> [2014-11-03 21:39:25,658] ERROR [KafkaApi-8] Error when processing fetch
> request for partition [activity.stream,5] offset 7475239 from consumer with
> correlation id 69 (kafka.server.KafkaApis)
> kafka.common.OffsetOutOfRangeException: Request for offset 7475239 but we
> only have log segments in the range 8014610 to 10227768.
>
> And on the client side I saw:
>
> Nov 03 21:39:25 INFO kafka consumer.py: Commit offset 10227769 in
> SimpleConsumer: group=ActivityStream, topic=activity.stream, partition=5
>
> ...
>
> ...
>
> Nov 03 21:39:26 ERROR demux.consumer_stream consumer_stream.py:
> OffsetOutOfRangeError(FetchResponse(topic='activity.stream', partition=5,
> error=1, highwaterMark=-1, messages= _decode_message_set_iter at 0x3dcf870>),)
>
>
> I am using the python kafka client.
>
> Why would the kafka client commit offset 10227769, and then, 1 second
> later, turn around and ask kafka for the offset 7475239?
>
> thx
> jim
>



-- 
-- Guozhang


Re: Spark and Kafka

2014-11-06 Thread Gwen Shapira
What's the window size? If the window is around 10 seconds and you are
sending data at very stable rate, this is expected.



On Thu, Nov 6, 2014 at 9:32 AM, Eduardo Costa Alfaia  wrote:

> Hi Guys,
>
> I am doing some tests with Spark Streaming and Kafka, but I have seen
> something strange, I have modified the JavaKafkaWordCount to use
> ReducebyKeyandWindow and to print in the screen the accumulated numbers of
> the words, in the beginning spark works very well in each interaction the
> numbers of the words increase but after 12 a 13 sec the results repeats
> continually.
>
> My program producer remain sending the words toward the kafka.
>
> Does anyone have any idea about this?
>
>
> ---
> Time: 1415272266000 ms
> ---
> (accompanied
> them,6)
> (merrier,5)
> (it
> possessed,5)
> (the
> treacherous,5)
> (Quite,12)
> (offer,273)
> (rabble,58)
> (exchanging,16)
> (Genoa,18)
> (merchant,41)
> ...
> ---
> Time: 1415272267000 ms
> ---
> (accompanied
> them,12)
> (merrier,12)
> (it
> possessed,12)
> (the
> treacherous,11)
> (Quite,24)
> (offer,602)
> (rabble,132)
> (exchanging,35)
> (Genoa,36)
> (merchant,84)
> ...
> ---
> Time: 1415272268000 ms
> ---
> (accompanied
> them,17)
> (merrier,18)
> (it
> possessed,17)
> (the
> treacherous,17)
> (Quite,35)
> (offer,889)
> (rabble,192)
> (the
> bed,1)
> (exchanging,51)
> (Genoa,54)
> ...
> ---
> Time: 1415272269000 ms
> ---
> (accompanied
> them,17)
> (merrier,18)
> (it
> possessed,17)
> (the
> treacherous,17)
> (Quite,35)
> (offer,889)
> (rabble,192)
> (the
> bed,1)
> (exchanging,51)
> (Genoa,54)
> ...
>
> ---
> Time: 141527227 ms
> ---
> (accompanied
> them,17)
> (merrier,18)
> (it
> possessed,17)
> (the
> treacherous,17)
> (Quite,35)
> (offer,889)
> (rabble,192)
> (the
> bed,1)
> (exchanging,51)
> (Genoa,54)
> ...
>
>
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>


Spark and Kafka

2014-11-06 Thread Eduardo Costa Alfaia
Hi Guys,

I am doing some tests with Spark Streaming and Kafka, but I have seen something 
strange, I have modified the JavaKafkaWordCount to use ReducebyKeyandWindow and 
to print in the screen the accumulated numbers of the words, in the beginning 
spark works very well in each interaction the numbers of the words increase but 
after 12 a 13 sec the results repeats continually. 

My program producer remain sending the words toward the kafka.

Does anyone have any idea about this?


---
Time: 1415272266000 ms
---
(accompanied
them,6)
(merrier,5)
(it
possessed,5)
(the
treacherous,5)
(Quite,12)
(offer,273)
(rabble,58)
(exchanging,16)
(Genoa,18)
(merchant,41)
...
---
Time: 1415272267000 ms
---
(accompanied
them,12)
(merrier,12)
(it
possessed,12)
(the
treacherous,11)
(Quite,24)
(offer,602)
(rabble,132)
(exchanging,35)
(Genoa,36)
(merchant,84)
...
---
Time: 1415272268000 ms
---
(accompanied
them,17)
(merrier,18)
(it
possessed,17)
(the
treacherous,17)
(Quite,35)
(offer,889)
(rabble,192)
(the
bed,1)
(exchanging,51)
(Genoa,54)
...
---
Time: 1415272269000 ms
---
(accompanied
them,17)
(merrier,18)
(it
possessed,17)
(the
treacherous,17)
(Quite,35)
(offer,889)
(rabble,192)
(the
bed,1)
(exchanging,51)
(Genoa,54)
...

---
Time: 141527227 ms
---
(accompanied
them,17)
(merrier,18)
(it
possessed,17)
(the
treacherous,17)
(Quite,35)
(offer,889)
(rabble,192)
(the
bed,1)
(exchanging,51)
(Genoa,54)
...


-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Interaction of retention settings for broker and topic plus partitions

2014-11-06 Thread Jason Rosenberg
Jun,

To clarify though, is it correct that a per topic limit will always
override the default limit of the same type?  (e.g. a large per-topic
retention hours vs. a small default retention hours)?

Jason

On Sat, Sep 20, 2014 at 12:28 AM, Jun Rao  wrote:

> That's right. The rule is that a log segment is deleted if either the size
> or the time limit is reached. Log sizes are per partition.
>
> Thanks,
>
> Jun
>
> On Thu, Sep 18, 2014 at 2:55 PM, Cory Watson  wrote:
>
> > Hello all!
> >
> > I'm curious about the interaction of server and topic level retention
> > settings. It's not clear to me the precedence of the follow:
> >
> >- broker's default log.retention.bytes
> >- topic's retention.bytes (which defaults to broker's
> >log.retention.bytes)
> >- broker's log.retention.hours and log.retention.minutes (if both are
> >specified then it seems to be the lower of the two, since it's when
> >"either" is exceeded)
> >
> > It seems that the rule is that when any of these are violated then the
> log
> > segment is deleted. Is this right?
> >
> > Also, just to be clear: The log sizes in questions are for a single
> > partitions logs?
> >
> > I have a situation where my per-topic retention.bytes is very high, but
> my
> > default log.retention.hours is lower (the default @ 168 hours). It seems
> > that it's truncating at the log.retention.hours instead of the topic's
> > retention.bytes.
> >
> > Am I understanding this correctly? :)
> >
> > --
> > Cory Watson
> > Principal Infrastructure Engineer // Keen IO
> >
>


Re: No longer supporting Java 6, if? when?

2014-11-06 Thread Steven Schlansker
Java 6 has been End of Life since Feb 2013.
Java 7 (and 8, but unfortunately that's too new still) has very compelling
features which can make development a lot easier.

The sooner more projects drop Java 6 the better, in my opinion :)

On Nov 5, 2014, at 7:45 PM, Worthy LaFollette  wrote:

> Mostly converted now to 1.7, this would be welcomed to get any new
> features.
> 
> On Wed Nov 05 2014 at 7:32:55 PM Joe Stein  wrote:
> 
>> This has been coming up in a lot of projects and for other reasons too I
>> wanted to kick off the discussion about if/when we end support for Java 6.
>> Besides any API we may want to use in >= 7 we also compile our binaries for
>> 6 for release currently.
>> 
>> /***
>> Joe Stein
>> Founder, Principal Consultant
>> Big Data Open Source Security LLC
>> http://www.stealth.ly
>> Twitter: @allthingshadoop 
>> /
>> 



Re: Disactivating Yammer Metrics Monitoring

2014-11-06 Thread Jason Rosenberg
Hi Francois,

We had the exact same problem.  We embed Kafka in our service container,
and we use yammer metrics to see data about the whole app (e.g. kafka, the
jvm, the service container wrapping it).  However, as you observed, by
default, kafka produces an insane amount of metrics.  So what we did, is
using the yammer library, you can disable specific metrics by removing
metrics from the yammer MetricsRegistry, which you can access from guice
(if you are using guice).  I implemented the MetricsRegistryListener, and
added the ability to remove metric names by regex, so I can still have some
metrics show up (like the simple 'AllTopic' counts for messages/bytes sent
from the producer), but block everything else that's per topic, etc

Jason

On Fri, Sep 19, 2014 at 11:34 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> I don't have any source or configs handy to check things, but you are
> saying you've configured Kafka to use GraphiteReporter, right?  So why not
> remove that config, so metrics stop being sent to Graphite if your Graphite
> setup is suffering?  If you do that and you still want to see your Kafka
> metrics, you can always use SPM  for Kafka
> (though some of the graphs will be empty until we get KAFKA-1481, or
> something else that improves metrics, in).  If you just want to use it
> temporarily, just use the free 30-day trial version until you beef up your
> Graphite setup.
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Fri, Sep 19, 2014 at 10:08 AM, François Langelier <
> f.langel...@gmail.com>
> wrote:
>
> > Hi Daniel,
> >
> > Thank you for your answer.
> >
> > It's possible that I didn't understood something, if so correct me
> please.
> >
> > From what I understood, from the kafka doc #monitoring
> > , kafka use
> Yammer
> > Metrics for monitoring the servers (the brokers) and the clients
> (producers
> > and consumers).
> >
> > Our web site is also using Yammer Metrics and push that to our Graphite
> > server and our web site also produce message in kafka.
> > From what I read, the Yammer Metrics GraphiteReporter is a kind of
> > Singleton, once I Enable it, it is working for all the process. (But I
> > might be wrong here...)
> >
> > We recently upgrade kafka from 0.7.2 to 0.8.1.1 and since the upgrade,
> > kafka is monitoring in our Graphite Server and is hammering it, so we
> > aren't able to use it because we always get timeout...
> >
> > SO, I was wondering if there is a way to disable the kafka monitoring to
> > our Graphite server.
> >
> > We are using the code in the tag 0.8.1.1 on github, so if the
> kafka-ganglia
> > isn't in the tag, we aren't using it :)
> >
> >
> > François Langelier
> > Étudiant en génie Logiciel - École de Technologie Supérieure
> > 
> > Capitaine Club Capra 
> > VP-Communication - CS Games  2014
> > Jeux de Génie  2011 à 2014
> > Magistrat Fraternité du Piranha 
> > Comité Organisateur Olympiades ÉTS 2012
> > Compétition Québécoise d'Ingénierie 2012 - Compétition Senior
> >
> > On Wed, Sep 17, 2014 at 11:05 PM, Daniel Compton  >
> > wrote:
> >
> > > Hi Francois
> > >
> > > I didn't quite understand how you've set up your metrics reporting. Are
> > you
> > > using the https://github.com/criteo/kafka-ganglia metrics reporter? If
> > so
> > > then you should be able to adjust the config to exclude the metrics you
> > > don't want, with kafka.ganglia.metrics.exclude.regex.
> > >
> > >
> > > On 18 September 2014 07:55, François Langelier 
> > > wrote:
> > >
> > > > Hi all!
> > > >
> > > > We are using yammer metrics to monitor some parts of our system.
> > > >
> > > > Since we upgrade from kafka 0.7.2 to 0.8.1.1, we saw a lot more data
> > > > getting in our graphite server and from what I saw, it looks like it
> > all
> > > > come from our producers.
> > > >
> > > > From what i understand, since we already use graphite, our
> > > graphiteReporter
> > > > is enable in our main web site and our kafka producers are having fun
> > > using
> > > > it too to monitor in graphite.
> > > >
> > > > The problem is that right now kafka is hammering of graphite server
> and
> > > we
> > > > have difficulty to saw our monitored data...
> > > >
> > > > Is there a way to deactivate the monitoring of our kafka producers?
> > > >
> > > >
> > > > François Langelier
> > > > Étudiant en génie Logiciel - École de Technologie Supérieure
> > > > 
> > > > Capitaine Club Capra 
> > > > VP-Communication - CS Games  2014
> > > > Jeux de Génie  2011 à 2014
> > > > Magistrat Fraternité du Piranha 
> > > > Comité Organisateur Olympiades 

Re: zookeeper upgrade or remove zookeeper dependency

2014-11-06 Thread Jason Rosenberg
We have been using zk 3.4.6 (and we use curator), without any problems with
kafka, for quite a while now

Jason

On Thu, Sep 18, 2014 at 2:18 PM, Mingtao Zhang 
wrote:

> Great :)
>
> Best Regards,
> Mingtao
>
> On Thu, Sep 18, 2014 at 2:04 PM, Guozhang Wang  wrote:
>
> > Hi Mingtao,
> >
> > We are shooting to cut the 0.8.2 branch this month.
> >
> > Guozhang
> >
> > On Thu, Sep 18, 2014 at 10:36 AM, Mingtao Zhang 
> > wrote:
> >
> > > Good to know. Does it mean release will go out after those bug is fixed
> > or
> > > moved to newer release? :)
> > >
> > > Best Regards,
> > > Mingtao
> > >
> > > On Wed, Sep 17, 2014 at 9:34 PM, Neha Narkhede <
> neha.narkh...@gmail.com>
> > > wrote:
> > >
> > > > You can track the list of open bugs here
> > > > <
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/KAFKA-1558?jql=project%20%3D%20Kafka%20and%20fixVersion%20%3D%200.8.2%20and%20status%20!%3D%20Resolved%20and%20status%20!%3D%20Closed
> > > > >
> > > > .
> > > >
> > > >
> > > > On Wed, Sep 17, 2014 at 10:00 AM, Mingtao Zhang <
> > mail2ming...@gmail.com>
> > > > wrote:
> > > >
> > > > > Could you also share a rough time point of 0.8.2 release?
> > > > >
> > > > > Best Regards,
> > > > > Mingtao
> > > > >
> > > > > On Wed, Sep 17, 2014 at 12:10 PM, Neha Narkhede <
> > > neha.narkh...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Kafka trunk is on a later zookeeper version (3.4.6). So the next
> > > > release
> > > > > > (0.8.2) will depend on zookeeper 3.4.6
> > > > > >
> > > > > > On Wed, Sep 17, 2014 at 8:55 AM, Mingtao Zhang <
> > > mail2ming...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I could see kafka is using zookeeper 3.3.4. For my integration
> > > > > purpose, I
> > > > > > > want to use curator, which requires a higher version than 3.3.4
> > > even
> > > > in
> > > > > > its
> > > > > > > lowest version.
> > > > > > >
> > > > > > > I there any plan to bump up zookeeper dependency? Or is there
> any
> > > > plan
> > > > > to
> > > > > > > remove zookeeper dependency?
> > > > > > >
> > > > > > > Best Regards,
> > > > > > > Mingtao
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: corrupt recovery checkpoint file issue....

2014-11-06 Thread Jason Rosenberg
forgot to mention, we are using 0.8.1.1

Jason

On Thu, Nov 6, 2014 at 9:31 AM, Jason Rosenberg  wrote:

> Hi,
>
> We recently had a kafka node go down suddenly. When it came back up, it
> apparently had a corrupt recovery file, and refused to startup:
>
> 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error starting up 
> KafkaServer
> java.lang.NumberFormatException: For input string: 
> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
> at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:481)
> at java.lang.Integer.parseInt(Integer.java:527)
> at 
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at 
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> at kafka.log.LogManager.(LogManager.scala:57)
> at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
>
> And since the app is under a monitor (so it was repeatedly restarting and
> failing with this error for several minutes before we got to it)…
>
> We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and
> it then restarted cleanly (but of course re-synced all it’s data from
> replicas, so we had no data loss).
>
> Anyway, I’m wondering if that’s the expected behavior? Or should it not
> declare it corrupt and then proceed automatically to an unclean restart?
>
> Should this NumberFormatException be handled a bit more gracefully?
>
> We saved the corrupt file if it’s worth inspecting (although I doubt it
> will be useful!)….
>
> Jason
> ​
>


OffsetOutOfRange Error

2014-11-06 Thread Jimmy John
I dug deeper and saw this during normal operation:

In the kafka broker log:

[2014-11-03 21:39:25,658] ERROR [KafkaApi-8] Error when processing fetch
request for partition [activity.stream,5] offset 7475239 from consumer with
correlation id 69 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 7475239 but we
only have log segments in the range 8014610 to 10227768.

And on the client side I saw:

Nov 03 21:39:25 INFO kafka consumer.py: Commit offset 10227769 in
SimpleConsumer: group=ActivityStream, topic=activity.stream, partition=5

...

...

Nov 03 21:39:26 ERROR demux.consumer_stream consumer_stream.py:
OffsetOutOfRangeError(FetchResponse(topic='activity.stream',
partition=5, error=1, highwaterMark=-1, messages=),)


I am using the python kafka client.

Why would the kafka client commit offset 10227769, and then, 1 second
later, turn around and ask kafka for the offset 7475239?

thx
jim


Re: OffsetOutOfRange Error

2014-11-06 Thread Jimmy John
I dug deeper and saw this during normal operation:

In the kafka broker log:

[2014-11-03 21:39:25,658] ERROR [KafkaApi-8] Error when processing fetch
request for partition [activity.stream,5] offset 7475239 from consumer with
correlation id 69 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 7475239 but we
only have log segments in the range 8014610 to 10227768.

And on the client side I saw:

Nov 03 21:39:25 INFO kafka consumer.py: Commit offset 10227769 in
SimpleConsumer: group=ActivityStream, topic=activity.stream, partition=5

...

...

Nov 03 21:39:26 ERROR demux.consumer_stream consumer_stream.py:
OffsetOutOfRangeError(FetchResponse(topic='activity.stream', partition=5,
error=1, highwaterMark=-1, messages=),)


I am using the python kafka client.

Why would the kafka client commit offset 10227769, and then, 1 second
later, turn around and ask kafka for the offset 7475239?

thx
jim


Consumer and Producer configs

2014-11-06 Thread Eduardo Costa Alfaia
Hi Guys,

How could I use the Consumer and Producer configs in my Kafka environment?

Thanks 

-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


kafka test jars in sbt?

2014-11-06 Thread Markus Jais
Hello,

I want to use the kafka_2.10-0.8.2-beta-test.jar in my Scala project.

It can be found here:
http://repo.maven.apache.org/maven2/org/apache/kafka/kafka_2.10/0.8.1.1/


In my build.sbt I write the following definition:
"org.apache.kafka" % "kafka_2.10" % "0.8.2-beta-test" 


But sbt cannot find it. Has anybody has any success with this?

I already used "gradle testJar" and it test jar gets published to :

.m2/repository/org/apache/kafka/kafka_2.10/0.8.2-beta


but sbt is looking for a:
.m2/repository/org/apache/kafka/kafka_2.10/0.8.2-beta-test/kafka_2.10-0.8.2-beta-test.pom

any tips on how to use the kafka test jar (together with the regular kafka jar) 
in an build.sbt file?

I want to start a kafka cluster for a unit test. 

Cheers,

Marus

corrupt recovery checkpoint file issue....

2014-11-06 Thread Jason Rosenberg
Hi,

We recently had a kafka node go down suddenly. When it came back up, it
apparently had a corrupt recovery file, and refused to startup:

2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
starting up KafkaServer
java.lang.NumberFormatException: For input string:
"^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:481)
at java.lang.Integer.parseInt(Integer.java:527)
at 
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at kafka.log.LogManager.loadLogs(LogManager.scala:105)
at kafka.log.LogManager.(LogManager.scala:57)
at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
at kafka.server.KafkaServer.startup(KafkaServer.scala:72)

And since the app is under a monitor (so it was repeatedly restarting and
failing with this error for several minutes before we got to it)…

We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it
then restarted cleanly (but of course re-synced all it’s data from
replicas, so we had no data loss).

Anyway, I’m wondering if that’s the expected behavior? Or should it not
declare it corrupt and then proceed automatically to an unclean restart?

Should this NumberFormatException be handled a bit more gracefully?

We saved the corrupt file if it’s worth inspecting (although I doubt it
will be useful!)….

Jason
​


Re: Information

2014-11-06 Thread Manikumar Reddy
Hi,

  These are GC logs. Specially these are young GC collection logs.  They
look normal to me.
  Are you observing any GC pauses on kafka server?  pl check below link.

  https://kafka.apache.org/documentation.html#java


Regards,
Kumar

On Thu, Nov 6, 2014 at 4:53 PM, Eduardo Costa Alfaia  wrote:

> Hi Guys
>
> Anyone could explain me this information?
>
> 208K), 0.0086120 secs] [Times: user=0.06 sys=0.00, real=0.01 secs]
> 2014-11-06T12:20:55.673+0100: 1256.382: [GC2014-11-06T12:20:55.674+0100:
> 1256.382: [ParNew: 551115K->2816K(613440K), 0.0204130 secs]
> 560218K->13933K(4126208K), 0.0205130 secs] [Times: user=0.09 sys=0.01,
> real=0.02 secs]
> 2014-11-06T12:21:03.372+0100: 1264.080: [GC2014-11-06T12:21:03.372+0100:
> 1264.080: [ParNew: 547827K->1047K(613440K), 0.0073880 secs]
> 558944K->12473K(4126208K), 0.0074770 secs] [Times: user=0.06 sys=0.00,
> real=0.00 secs]
> 2014-11-06T12:21:10.416+0100: 1271.124: [GC2014-11-06T12:21:10.416+0100:
> 1271.124: [ParNew: 545782K->2266K(613440K), 0.0069530 secs]
> 557208K->13836K(4126208K), 0.0070420 secs] [Times: user=0.05 sys=0.00,
> real=0.01 secs]
> 2014-11-06T12:21:18.307+0100: 1279.015: [GC2014-11-06T12:21:18.307+0100:
> 1279.015: [ParNew: 546921K->2156K(613440K), 0.0071050 secs]
> 558491K->13855K(4126208K), 0.0071900 secs] [Times: user=0.06 sys=0.00,
> real=0.01 secs]
> 2014-11-06T12:21:26.394+0100: 1287.102: [GC2014-11-06T12:21:26.394+0100:
> 1287.102: [ParNew: 546237K->3125K(613440K), 0.0071260 secs]
> 557936K->14940K(4126208K), 0.0072170 secs] [Times: user=0.05 sys=0.00,
> real=0.00 secs]
> 2014-11-06T12:21:33.913+0100: 1294.621: [GC2014-11-06T12:21:33.913+0100:
> 1294.621: [ParNew: 547726K->2452K(613440K), 0.0070220 secs]
> 559541K->14367K(412
>
> Thanks
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>


Information

2014-11-06 Thread Eduardo Costa Alfaia
Hi Guys

Anyone could explain me this information?

208K), 0.0086120 secs] [Times: user=0.06 sys=0.00, real=0.01 secs] 
2014-11-06T12:20:55.673+0100: 1256.382: [GC2014-11-06T12:20:55.674+0100: 
1256.382: [ParNew: 551115K->2816K(613440K), 0.0204130 secs] 
560218K->13933K(4126208K), 0.0205130 secs] [Times: user=0.09 sys=0.01, 
real=0.02 secs] 
2014-11-06T12:21:03.372+0100: 1264.080: [GC2014-11-06T12:21:03.372+0100: 
1264.080: [ParNew: 547827K->1047K(613440K), 0.0073880 secs] 
558944K->12473K(4126208K), 0.0074770 secs] [Times: user=0.06 sys=0.00, 
real=0.00 secs] 
2014-11-06T12:21:10.416+0100: 1271.124: [GC2014-11-06T12:21:10.416+0100: 
1271.124: [ParNew: 545782K->2266K(613440K), 0.0069530 secs] 
557208K->13836K(4126208K), 0.0070420 secs] [Times: user=0.05 sys=0.00, 
real=0.01 secs] 
2014-11-06T12:21:18.307+0100: 1279.015: [GC2014-11-06T12:21:18.307+0100: 
1279.015: [ParNew: 546921K->2156K(613440K), 0.0071050 secs] 
558491K->13855K(4126208K), 0.0071900 secs] [Times: user=0.06 sys=0.00, 
real=0.01 secs] 
2014-11-06T12:21:26.394+0100: 1287.102: [GC2014-11-06T12:21:26.394+0100: 
1287.102: [ParNew: 546237K->3125K(613440K), 0.0071260 secs] 
557936K->14940K(4126208K), 0.0072170 secs] [Times: user=0.05 sys=0.00, 
real=0.00 secs] 
2014-11-06T12:21:33.913+0100: 1294.621: [GC2014-11-06T12:21:33.913+0100: 
1294.621: [ParNew: 547726K->2452K(613440K), 0.0070220 secs] 559541K->14367K(412

Thanks
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Kafka Release timelines

2014-11-06 Thread dinesh kumar
Hi,
I found the future release plan wiki here
 . I
see that 0.8.2 is still in beta even though it was stated for September.
What is the expected date for 0.9 release?

Thanks,
Dinesh


Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-11-06 Thread Tomas Nunez
Thanks to this last hint we finally got it to work!

One problem was that the new kafka 0.8 cluster, was compiled with scala
2.10, while the old 0.7 one was compiled with scala 2.8.
As the MigrationTool uses 0.8 binaries, but also libraries from 0.7, there
was a mismatch and it wasn't finding the classes it should.

Solution? As the new cluster needs to keep running and changing it to
scala2.8 wasn't an option, I just downloaded the kafka 0.8 version compiled
with scala 2.8 (
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz),
and run kafka-run-class.sh from there. Then there was no more missing
libraries.

But I still had those "Caused by: java.lang.NullPointerException" errors.
After last Gwen's question, I checked the consumer.properties, where the
"zookeeper.connect". Then I thought, although that's the correct value in
0.8 (checked here https://kafka.apache.org/08/configuration.html), if it's
connecting to the 0.7 cluster, to consume messages, maybe I should use 0.7
confing instead, where it is "zk.connect" (checked here
http://kafka.apache.org/07/configuration.html). Changed it and BOOM! It
worked!

So in conclussion: MigrationTool needs to be compiled with the same scala
version as the 0.7 cluster, and consumer config needs to comply with 0.7,
while producer config needs to comply with 0.8

And that's it!

Thanks a lot

Regards,
Tomàs

On Thu, Nov 6, 2014 at 3:16 AM, Gwen Shapira  wrote:

> Also, can you post your configs? Especially the "zookeeper.connect" one?
>
> On Wed, Nov 5, 2014 at 6:15 PM, Gwen Shapira 
> wrote:
>
> > Regarding more information:
> > Maybe ltrace?
> >
> > If I were you, I'd go to MigrationTool code and start adding LOG lines.
> > because there aren't enough of those to troubleshoot.
> >
> > On Wed, Nov 5, 2014 at 6:13 PM, Gwen Shapira 
> > wrote:
> >
> >> org.apache.zookeeper.ClientCnxn is throwing the exception, so I'm 100%
> >> sure it eventually found the class.
> >>
> >> On Wed, Nov 5, 2014 at 5:59 PM, Tomas Nunez  wrote:
> >>
> >>> Ok, still fighting with the migrationTool here...
> >>>
> >>> That tuple wasn't in the scala-library.jar. It turns out I was using
> >>> scala
> >>> 2.10 for kafka0.8 and scala 2.8 for kafka0.7, and the jar files were
> not
> >>> compatible. So, for the record, it seems that you need both the 0.7 jar
> >>> files and your 0.8 kafka compiled with the same java version.
> >>>
> >>> After fixing that (downloading kafka 0.8 compiled with scala 2.8), I'm
> >>> now
> >>> facing a different error, this time more crypic:
> >>>
> >>> Kafka migration tool failed due to:
> >>> java.lang.reflect.InvocationTargetException
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>> at
> >>>
> >>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>> at
> >>>
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.lang.reflect.Method.invoke(Method.java:606)
> >>> at
> >>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
> >>> Caused by: java.lang.NullPointerException
> >>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
> >>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
> >>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
> >>> at
> org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
> >>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
> >>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
> >>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
> >>> at
> >>>
> >>>
> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
> >>> at
> >>>
> >>>
> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
> >>> at
> >>>
> >>>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
> >>> at
> >>>
> >>>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
> >>> at
> >>>
> >>>
> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
> >>> at
> >>>
> >>>
> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
> >>> ... 5 more
> >>>
> >>> [2014-11-06 01:32:44,362] ERROR Kafka migration tool failed:
> >>> (kafka.tools.KafkaMigrationTool)
> >>> java.lang.reflect.InvocationTargetException
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>> at
> >>>
> >>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>> at
> >>>
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.lang.reflect.Method.invoke(Method.java:606)
> >>> at
> >>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
> >>> Caused