offset migration from kafka to zookeeper

2015-02-12 Thread tao xiao
Hi team,

I was trying to migrate my consumer offset from kafka to zookeeper.

Here is the original settings of my consumer

props.put(offsets.storage, kafka);

props.put(dual.commit.enabled, false);
Here is the steps

1. set dual.commit.enabled=true
2. restart my consumer and monitor offset lag with
kafka.tools.ConsumerOffsetChecker
3. set offsets.storage=zookeeper
4. restart my consumer and monitor offset lag with
kafka.tools.ConsumerOffsetChecker

After step 4 my consumer was able to continually consume data from topic
but the offset lag remained unchanged. Did I do anything wrong?

-- 
Regards,
Tao


Re: offset migration from kafka to zookeeper

2015-02-12 Thread Joel Koshy
That is weird. Are you by any chance running an older version of the
offset checker? Is this straightforward to reproduce?

On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
 Joel,
 
 No, the metric was not increasing. It was 0 all the time.
 
 On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
  Actually I meant to say check that is not increasing.
 
  On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
   Possibly a bug - can you also look at the MaxLag mbean in the consumer
   to verify that the maxlag is zero?
  
   On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
Hi Joel,
   
When I set dual.commit.enabled=true the count value of both metrics got
increased. After I set offsets.storage=zookeeper only
  ZooKeeperCommitsPerSec
changed but not KafkaCommitsPerSec. I think this is expected as kafka
offset storage was turned off.
   
But when I looked up the consumer lag via
  kafka.tools.ConsumerOffsetChecker
the lag still remained unchanged.
   
I scanned through the source code of ConsumerOffsetChecker it  doesn't
check the offset in zk unless offsetFetchResponse returns NoOffset.
  Since
the consumer used kafka as the offset storage before I don't think
offsetFetchResponse would return NoOffset
   
 offsetFetchResponse.requestInfo.foreach { case (topicAndPartition,
offsetAndMetadata) =
   
if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
   
  val topicDirs = new ZKGroupTopicDirs(group,
  topicAndPartition.
topic)
   
  // this group may not have migrated off zookeeper for offsets
storage (we don't expose the dual-commit option in this tool
   
  // (meaning the lag may be off until all the consumers in the
group have the same setting for offsets storage)
   
  try {
   
val offset = ZkUtils.readData(zkClient,
  topicDirs.consumerOffsetDir
+ /%d.format(topicAndPartition.partition))._1.toLong
   
offsetMap.put(topicAndPartition, offset)
   
  } catch {
   
case z: ZkNoNodeException =
   
   
   if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
   
offsetMap.put(topicAndPartition,-1)
   
  else
   
throw z
   
  }
   
}
   
else if (offsetAndMetadata.error == ErrorMapping.NoError)
   
  offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
   
else {
   
  println(Could not fetch offset for %s due to %s..format(
topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
   
}
   
  }
   
On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com
  wrote:
   
 There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec
  -
 can you look those up and see what they report?

 On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
  Hi team,
 
  I was trying to migrate my consumer offset from kafka to zookeeper.
 
  Here is the original settings of my consumer
 
  props.put(offsets.storage, kafka);
 
  props.put(dual.commit.enabled, false);
  Here is the steps
 
  1. set dual.commit.enabled=true
  2. restart my consumer and monitor offset lag with
  kafka.tools.ConsumerOffsetChecker
  3. set offsets.storage=zookeeper
  4. restart my consumer and monitor offset lag with
  kafka.tools.ConsumerOffsetChecker
 
  After step 4 my consumer was able to continually consume data from
  topic
  but the offset lag remained unchanged. Did I do anything wrong?
 
  --
  Regards,
  Tao


   
   
--
Regards,
Tao
  
 
 
 
 
 -- 
 Regards,
 Tao



Re: Increased CPU usage with 0.8.2-beta

2015-02-12 Thread Jay Kreps
This is a serious issue, we'll take a look.

-Jay

On Thu, Feb 12, 2015 at 3:19 PM, Solon Gordon so...@knewton.com wrote:

 I saw a very similar jump in CPU usage when I tried upgrading from 0.8.1.1
 to 0.8.2.0 today in a test environment. The Kafka cluster there is two
 m1.larges handling 2,000 partitions across 32 topics. CPU usage rose from
 40% into the 150%–190% range, and load average from under 1 to over 4.
 Downgrading to 0.8.1.1 brought the CPU and load back to the previous
 values.

 If there's more info that would be helpful, please let me know.

 On Thu, Feb 12, 2015 at 4:17 PM, Mathias Söderberg 
 mathias.soederb...@gmail.com wrote:

  Jun,
 
  Pardon the radio silence. I booted up a new broker, created a topic with
  three (3) partitions and replication factor one (1) and used the
  *kafka-producer-perf-test.sh
  *script to generate load (using messages of roughly the same size as
 ours).
  There was a slight increase in CPU usage (~5-10%) on 0.8.2.0-rc2 compared
  to 0.8.1.1, but that was about it.
 
  I upgraded our staging cluster to 0.8.2.0 earlier this week or so, and
 had
  to add an additional broker due to increased load after the upgrade (note
  that the incoming load on the cluster has been pretty much consistent).
  Since the upgrade we've been seeing an 2-3x increase in latency as well.
  I'm considering downgrading to 0.8.1.1 again to see if it resolves our
  issues.
 
  Best regards,
  Mathias
 
  On Tue Feb 03 2015 at 6:44:36 PM Jun Rao j...@confluent.io wrote:
 
   Mathias,
  
   The new hprof doesn't reveal anything new to me. We did fix the logic
 in
   using Purgatory in 0.8.2, which could potentially drive up the CPU
 usage
  a
   bit. To verify that, could you do your test on a single broker (with
   replication factor 1) btw 0.8.1 and 0.8.2 and see if there is any
   significant difference in cpu usage?
  
   Thanks,
  
   Jun
  
   On Tue, Feb 3, 2015 at 5:09 AM, Mathias Söderberg 
   mathias.soederb...@gmail.com wrote:
  
Jun,
   
I re-ran the hprof test, for about 30 minutes again, for 0.8.2.0-rc2
  with
the same version of snappy that 0.8.1.1 used. Attached the logs.
Unfortunately there wasn't any improvement as the node running
   0.8.2.0-rc2
still had a higher load and CPU usage.
   
Best regards,
Mathias
   
On Tue Feb 03 2015 at 4:40:31 AM Jaikiran Pai 
  jai.forums2...@gmail.com
wrote:
   
On Monday 02 February 2015 11:03 PM, Jun Rao wrote:
 Jaikiran,

 The fix you provided in probably unnecessary. The channel that we
  use
   in
 SimpleConsumer (BlockingChannel) is configured to be blocking. So
  even
 though the read from the socket is in a loop, each read blocks if
   there
is
 no bytes received from the broker. So, that shouldn't cause extra
  CPU
 consumption.
Hi Jun,
   
Of course, you are right! I forgot that while reading the thread
 dump
  in
hprof output, one has to be aware that the thread state isn't shown
  and
the thread need not necessarily be doing any CPU activity.
   
-Jaikiran
   
   

 Thanks,

 Jun

 On Mon, Jan 26, 2015 at 10:05 AM, Mathias Söderberg 
 mathias.soederb...@gmail.com wrote:

 Hi Neha,

 I sent an e-mail earlier today, but noticed now that it didn't
actually go
 through.

 Anyhow, I've attached two files, one with output from a 10 minute
  run
and
 one with output from a 30 minute run. Realized that maybe I
  should've
done
 one or two runs with 0.8.1.1 as well, but nevertheless.

 I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the
   same
CPU
 usage as with the beta version (basically pegging all cores). If
 I
manage
 to find the time I'll do another run with hprof on the rc2
 version
later
 today.

 Best regards,
 Mathias

 On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede 
 n...@confluent.io
  
wrote:

 The following should be sufficient

 java
 -agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=
 y,thread=y,file=kafka.hprof
 classname

 You would need to start the Kafka server with the settings above
  for
 sometime until you observe the problem.

 On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg 
 mathias.soederb...@gmail.com wrote:

 Hi Neha,

 Yeah sure. I'm not familiar with hprof, so any particular
  options I
 should
 include or just run with defaults?

 Best regards,
 Mathias

 On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede 
  n...@confluent.io
 wrote:
 Thanks for reporting the issue. Would you mind running hprof
 and
 sending
 the output?

 On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg 
 mathias.soederb...@gmail.com wrote:

 Good day,

 I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and
   noticed
 that
 the CPU usage 

Re: offset migration from kafka to zookeeper

2015-02-12 Thread tao xiao
Thanks for the explanation. It there a way that I can wipe out the offset
stored in kafka so that the checker can continue to work again?

On Fri, Feb 13, 2015 at 1:31 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I think this is the offset checker bug.
 The offset checker will
 1. first check if the offset exists in offset topic on broker or not.
 2. If it is on broker then it will just return that offset.
 3. Otherwise it goes to zookeeper.

 So the problem you saw was actually following this logic.
 After dual commit, offset topic already had the offsets for this consumer
 and topic.
 Then you switched to zookeeper commit.
 Because the offset topic has the offsets already, offset checker will use
 that and skip checking zookeeper. So the offset will not change anymore
 because you are no longer committing to offset topic on broker, while
 offset checker always use that offset.

 On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote:

 I used the one shipped with 0.8.2. It is pretty straightforward to
 reproduce the issue.
 
 Here are the steps to reproduce:
 1. I have a consumer using high level consumer API with initial settings
 offsets.storage=kafka and dual.commit.enabled=false.
 2. After consuming messages for a while shutdown the consumer and change
 setting dual.commit.enabled=true
 3. bounce the consumer and run for while. The lag looks good
 4. change setting offsets.storage=zookeeper and bounce the consumer.
 Starting from now the lag remain unchanged
 
 On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
  That is weird. Are you by any chance running an older version of the
  offset checker? Is this straightforward to reproduce?
 
  On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
   Joel,
  
   No, the metric was not increasing. It was 0 all the time.
  
   On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
Actually I meant to say check that is not increasing.
   
On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
 Possibly a bug - can you also look at the MaxLag mbean in the
  consumer
 to verify that the maxlag is zero?

 On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
  Hi Joel,
 
  When I set dual.commit.enabled=true the count value of both
  metrics got
  increased. After I set offsets.storage=zookeeper only
ZooKeeperCommitsPerSec
  changed but not KafkaCommitsPerSec. I think this is expected as
  kafka
  offset storage was turned off.
 
  But when I looked up the consumer lag via
kafka.tools.ConsumerOffsetChecker
  the lag still remained unchanged.
 
  I scanned through the source code of ConsumerOffsetChecker it
  doesn't
  check the offset in zk unless offsetFetchResponse returns
 NoOffset.
Since
  the consumer used kafka as the offset storage before I don't
 think
  offsetFetchResponse would return NoOffset
 
   offsetFetchResponse.requestInfo.foreach { case
 (topicAndPartition,
  offsetAndMetadata) =
 
  if (offsetAndMetadata ==
 OffsetMetadataAndError.NoOffset) {
 
val topicDirs = new ZKGroupTopicDirs(group,
topicAndPartition.
  topic)
 
// this group may not have migrated off zookeeper for
  offsets
  storage (we don't expose the dual-commit option in this tool
 
// (meaning the lag may be off until all the consumers
  in the
  group have the same setting for offsets storage)
 
try {
 
  val offset = ZkUtils.readData(zkClient,
topicDirs.consumerOffsetDir
  + /%d.format(topicAndPartition.partition))._1.toLong
 
  offsetMap.put(topicAndPartition, offset)
 
} catch {
 
  case z: ZkNoNodeException =
 
 
 if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
 
  offsetMap.put(topicAndPartition,-1)
 
else
 
  throw z
 
}
 
  }
 
  else if (offsetAndMetadata.error ==
 ErrorMapping.NoError)
 
offsetMap.put(topicAndPartition,
  offsetAndMetadata.offset)
 
  else {
 
println(Could not fetch offset for %s due to
  %s..format(
  topicAndPartition,
  ErrorMapping.exceptionFor(offsetAndMetadata.error)))
 
  }
 
}
 
  On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy
 jjkosh...@gmail.com
wrote:
 
   There are mbeans named KafkaCommitsPerSec and
  ZooKeeperCommitsPerSec
-
   can you look those up and see what they report?
  
   On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
Hi team,
   
I was trying to migrate my consumer offset from kafka to
  zookeeper.
   
Here is the original settings of my consumer
   

Re: understanding partition key

2015-02-12 Thread Gary Ogden
So it's not possible to have 1 topic with 1 partition and many consumers of
that topic?

My intention is to have a topic with many consumers, but each consumer
needs to be able to have access to all the messages in that topic.

On 11 February 2015 at 20:42, Zijing Guo alter...@yahoo.com.invalid wrote:

 Partition key is on producer level, that if you have multiple partitions
 for a single topic, then you can pass in a key for the KeyedMessage object,
 and base on different partition.class, it will return a partition number
 for the producer, and producer will find the leader for that partition.I
 don't know how kafka could handle time series case, but depends on how many
 partitions for that topic. If you only have 1 partition, then you don't
 need to worry about order at all, since each consumer group can only allow
 1 consumer instance to consume that data.  if you have multiple partitions
 (say 3 for example), then you can fire up 3 consumer instances under the
 same consumer group, and each will only consume 1 partition's data. if
 order in each partition matters, then you need to do some work on the
 producer side.Hope this helpsEdwin

  On Wednesday, February 11, 2015 3:14 PM, Gary Ogden gog...@gmail.com
 wrote:


  I'm trying to understand how the partition key works and whether I need to
 specify a partition key for my topics or not.  What happens if I don't
 specify a PK and I have more than one consumer that wants all messages in a
 topic for a certain period of time? Will those consumers get all the
 messages, but they just may not be ordered correctly?

 The current scenario is that we will have events going into a topic based
 on customer and the data will remain in the topic for 24 hours. We will
 then have multiple consumers reading messages from that topic. They will
 want to be able to get them out over a time range (could be last hour, last
 8 hours etc).

 So if I specify the same PK for each subscriber, then each consumer will
 get all messages in the correct order?  If I don't specify the PK or use a
 random one, will each consumer still get all the messages but they just
 won't be ordered correctly?






Re: understanding partition key

2015-02-12 Thread David McNelis
Gary,

That is certainly a valid use case.  What Zijing was saying is that you can
only have 1 consumer per consumer application per partition.

I think that what it boils down to is how you want your information grouped
inside your timeframes.  For example, if you want to have everything for a
specific user, then you could use that as your partition key, ensuring that
any data for that user is processed by the same consumer (in however many
consumer applications you opt to run).

The second point that I think Zijing was getting at was whether or not your
proposed use case makes sense for Kafka.  If your goal is to do
time-interval batch processing (versus N-record batches), then why use
Kafka for it?  Why not use something more adept at batch processing?  For
example, if you're using HBase you can use Pig jobs that would read only
the records created between specific timestamps.

David

On Thu, Feb 12, 2015 at 7:44 AM, Gary Ogden gog...@gmail.com wrote:

 So it's not possible to have 1 topic with 1 partition and many consumers of
 that topic?

 My intention is to have a topic with many consumers, but each consumer
 needs to be able to have access to all the messages in that topic.

 On 11 February 2015 at 20:42, Zijing Guo alter...@yahoo.com.invalid
 wrote:

  Partition key is on producer level, that if you have multiple partitions
  for a single topic, then you can pass in a key for the KeyedMessage
 object,
  and base on different partition.class, it will return a partition number
  for the producer, and producer will find the leader for that partition.I
  don't know how kafka could handle time series case, but depends on how
 many
  partitions for that topic. If you only have 1 partition, then you don't
  need to worry about order at all, since each consumer group can only
 allow
  1 consumer instance to consume that data.  if you have multiple
 partitions
  (say 3 for example), then you can fire up 3 consumer instances under the
  same consumer group, and each will only consume 1 partition's data. if
  order in each partition matters, then you need to do some work on the
  producer side.Hope this helpsEdwin
 
   On Wednesday, February 11, 2015 3:14 PM, Gary Ogden 
 gog...@gmail.com
  wrote:
 
 
   I'm trying to understand how the partition key works and whether I need
 to
  specify a partition key for my topics or not.  What happens if I don't
  specify a PK and I have more than one consumer that wants all messages
 in a
  topic for a certain period of time? Will those consumers get all the
  messages, but they just may not be ordered correctly?
 
  The current scenario is that we will have events going into a topic based
  on customer and the data will remain in the topic for 24 hours. We will
  then have multiple consumers reading messages from that topic. They will
  want to be able to get them out over a time range (could be last hour,
 last
  8 hours etc).
 
  So if I specify the same PK for each subscriber, then each consumer will
  get all messages in the correct order?  If I don't specify the PK or use
 a
  random one, will each consumer still get all the messages but they just
  won't be ordered correctly?
 
 
 
 



Re: understanding partition key

2015-02-12 Thread David McNelis
I'm going to go a bit in reverse for your questions. We built a restful API
to push data to so that we could submit things from multiple sources that
aren't necessarily things that our team would maintain, as well as validate
that data before we send it off to a topic.

As for consumers... we expect that a consumer will receive everything in a
particular partition, but definitely not everything in a topic.  For
example, for DNS information, a consumer expects to see everything for a
domain (our partition key there), but not for all domains... it would be
too much data to handle with a low enough latency.

For us, our latencies vary by what we're processing at a given moment or
other large jobs that might be running outside of the stream processing.
Generally speaking we keep latency within our limits by adjusting the
number of partitions for a topic, and subsequently the number of
consumers...which works until you hit a wall with your data store, in which
case tweaking must happen there too.

Your flow of data will be the biggest impact on latency (along with batch
size)  If you see 100 events a minute, and have a batch size of 1k,
you'll have 10 minutes of latency... if you process your data as each
record comes in, then as long as your consumer can keep up with that load,
you'll have very low latency.

Kafka's latency has never been an issue for us, and we've always found
there's something that we're doing that is affecting the overall throughput
of the systems, be it needing to play with the number of partitions,
adjusting batch size, resizing the hadoop cluster to meet increased need
there.

On Thu, Feb 12, 2015 at 9:54 AM, Gary Ogden gog...@gmail.com wrote:

 Thanks again David. So what kind of latencies are you experiencing with
 this? If I wanted to act upon certain events in this and send out alarms
 (email, sms etc), what kind of delays are you seeing by the time you're
 able to process them?

 It seems if you were to create an alarm topic, and dump alerts on there to
 be processed, and have a consumer then process those alerts (save to
 Cassandra and send out the notification), you could see some sort of delay.

 But none of you consumers are expecting that they will be getting all the
 data for that topic, are they? That's the hitch for me. I think I could
 rework the design so that no consumer would assume it's getting all the
 messages in the topic.

 When you say central producer stack, this is something you built outside of
 kafka I assume.

 On 12 February 2015 at 09:40, David McNelis dmcne...@emergingthreats.net
 wrote:

  In our setup we deal with a similar situation (lots of time-series data
  that we have to aggregate in a number of different ways).
 
  Our approach is to push all of our data to a central producer stack, that
  stack then submits data to different topics, depending on a set of
  predetermined rules.
 
  For arguments sake, lets say we see 100 events / second... Once the data
 is
  pushed to topics we have several consumer applications.  The first pushes
  the lowest level raw data into our cluster (in our case HBase, but could
  just as easily be C*). This first consumer we use a small batch size
  say 100 records.
 
  Our second consumer does some aggregation and summarization and needs to
  reference additional resources.  This batch size is considerably larger,
  say 1000 records in size.
 
  Our third consumer does a much larger aggregation where we're reducing it
  down to a much smaller dataset, by orders of magnitude, and we'll run
 that
  batch size at 10k records.
 
  Our aggregations are oftentimes time based, like rolling events /
 counters
  to the hour or day level... so while we may occasionally re-process some
 of
  the same information throughout the day, it lets us maintain a system
 where
  we have near-real-time access to most of the data we're ingesting.
 
  This certainly is something we've had to tweak in terms of the numbers of
  consumers / partitions and batch sizes to get to work optimally, but
 we're
  pretty happy with it at the moment.
 
  On Thu, Feb 12, 2015 at 8:22 AM, Gary Ogden gog...@gmail.com wrote:
 
   Thanks David. Whether Kafka is the right choice is exactly what I'm
  trying
   to determine.  Everything I want to do with these events is time based.
   Store them in the topic for 24 hours. Read from the topics and get data
  for
   a time period (last hour , last 8 hours etc). This reading from the
  topics
   could happen numerous times for the same dataset.  At the end of the 24
   hours, we would then want to store the events in long term storage in
  case
   we need them in the future.
  
   I'm thinking Kafka isn't the right fit for this use case though.
  However,
   we have cassandra already installed and running. Maybe we use kafka as
  the
   in-between... So dump the events onto kafka, have consumers that
 process
   the events and dump them into Cassandra and then we read the the time
   series data we need