RE: kafka 0.8.1.1 delay in replica data

2015-01-06 Thread chenlax
i set the conf num.replica.fetchers=6,then solve this.

Thanks,
Lax


 Date: Wed, 31 Dec 2014 15:51:57 -0800
 Subject: Re: kafka 0.8.1.1 delay in replica data
 From: j...@confluent.io
 To: users@kafka.apache.org
 
 Is the system time in-sync btw the two hosts?
 
 Thanks,
 
 Jun
 
 On Tue, Dec 23, 2014 at 5:23 AM, chenlax lax...@hotmail.com wrote:
 
  topic:
 
  Topic_test_data_1-10
 
  leader broker data info:
 -rw-rw-r-- 1 mqq mqq237728 Dec 23 19:51 000211076304.index
  -rw-rw-r-- 1 mqq mqq 536875106 Dec 23 19:51 000211076304.log
  -rw-rw-r-- 1 mqq mqq237632 Dec 23 19:45 000203797807.index
  -rw-rw-r-- 1 mqq mqq 536879703 Dec 23 19:45 000203797807.log
  -rw-rw-r-- 1 mqq mqq237576 Dec 23 19:38 000196519197.index
  -rw-rw-r-- 1 mqq mqq 536885834 Dec 23 19:38 000196519197.log
  -rw-rw-r-- 1 mqq mqq237848 Dec 23 19:32 000189241352.index
  -rw-rw-r-- 1 mqq mqq 536882522 Dec 23 19:32 000189241352.log
  -rw-rw-r-- 1 mqq mqq238048 Dec 23 19:26 000181963182.index
  -rw-rw-r-- 1 mqq mqq 536874188 Dec 23 19:26 000181963182.log
  -rw-rw-r-- 1 mqq mqq237472 Dec 23 19:19 000174684585.index
  -rw-rw-r-- 1 mqq mqq 536883850 Dec 23 19:19 000174684585.log
  -rw-rw-r-- 1 mqq mqq237848 Dec 23 19:13 000167406454.index
  -rw-rw-r-- 1 mqq mqq 536877465 Dec 23 19:13 000167406454.log
 
  replica data info:
 
  -rw-rw-r-- 1 mqq mqq 173539804 Dec 23 20:37 000196519197.log
  -rw-rw-r-- 1 mqq mqq  10485760 Dec 23 20:37 000196519197.index
  -rw-rw-r-- 1 mqq mqq  4128 Dec 23 20:36 000189241352.index
  -rw-rw-r-- 1 mqq mqq 536882522 Dec 23 20:36 000189241352.log
  -rw-rw-r-- 1 mqq mqq  4128 Dec 23 20:30 000181963182.index
  -rw-rw-r-- 1 mqq mqq 536874188 Dec 23 20:30 000181963182.log
  -rw-rw-r-- 1 mqq mqq  4128 Dec 23 20:25 000174684585.index
  -rw-rw-r-- 1 mqq mqq 536883850 Dec 23 20:25 000174684585.log
  -rw-rw-r-- 1 mqq mqq  4128 Dec 23 20:20 000167406454.index
  -rw-rw-r-- 1 mqq mqq 536877465 Dec 23 20:20 000167406454.log
 
  the offset  167406454  in leader broker  time is 19:13, but in replica
  broker the time is 20:20.
 
  Thanks,
  Lax
 
 
  From: lax...@hotmail.com
  To: users@kafka.apache.org
  Subject: kafka 0.8.1.1 delay in replica data
  Date: Tue, 23 Dec 2014 20:44:31 +0800
 
 
 
 
  i set a topic retention.ms as 36;and then i find the replica data is
  delay.
  What could cause this?
 
  as follows screenshot:
  master broker:
 
  replica broker:
 
 
 
  Thanks,
  Lax
 
 
  

The message would be committed if message un-committed

2015-01-06 Thread chenlax
as the kafka doc  Only committed messages are ever given out to the consumer. 
.
if followers does not copy the message on time and followers are in ISR, 
consumers would consume the message from leader broker?

Thanks,
Lax
  

The message would be consumed if message un-committed

2015-01-06 Thread chenlax
 as the kafka doc  Only committed messages are ever given out to the consumer. 
.
if followers does not copy the message on time and followers are in ISR, 
consumers would consume the message from leader broker?

Thanks,
Lax
  

kafka deleted old logs but not released

2015-01-06 Thread Yonghui Zhao
Hi,

We use kafka_2.10-0.8.1.1 in our server. Today we found disk space alert.

We find many kafka data files are deleted, but still opened by kafka.

such as:

_yellowpageV2-0/68170670.log (deleted)
java   8446 root  724u  REG  253,2 536937911
26087362
/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/68818668.log
(deleted)
java   8446 root  725u  REG  253,2 536910838
26087364
/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/69457098.log
(deleted)
java   8446 root  726u  REG  253,2 536917902
26087368
/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/70104914.log
(deleted)


Is there anything wrong or wrong configed?


Offset management in multi-threaded high-level consumer

2015-01-06 Thread Rafi Shamim
Hello,

I would like to write a multi-threaded consumer for the high-level
consumer in Kafka 0.8.1. I have found two ways that seem feasible
while keeping the guarantee that messages in a partition are processed
in order. I would appreciate any feedback this list has.

Option 1

- Create multiple threads, so each thread has its own ConsumerConnector.
- Manually commit offsets in each thread after every N messages.
- This was discussed a bit on this list previously. See [1].

### Questions
- Is there a problem with making multiple ConsumerConnectors per machine?
- What does it take for ZooKeeper to handle this much load? We have a
3-node ZooKeeper cluster with relatively small machines. (I expect the
topic will have about 40 messages per second. There will be 3 consumer
groups. That would be 120 commits per second at most, but I can reduce
the frequency of commits to make this lower.)

### Extra info
Kafka 0.9 will have an entirely different commit API, which will allow
one connection to commit offsets per partition, but I can’t wait that
long. See [2].


Option 2

- Create one ConsumerConnector, but ask for multiple streams in that
connection. Give each thread one stream.
- Since there is no way to commit offsets per stream right now, we
need to do autoCommit.
- This sacrifices the at-least-once processing guarantee, which would
be nice to have. See KAFKA-1612 [3].

### Extra info
- There was some discussion in KAFKA-996 about a markForCommit()
method so that autoCommit would preserve the at-least-once guarantee,
but it seems more likely that the consumer API will just be redesigned
to allow commits per partition instead. See [4].


So basically I'm wondering if option 1 is feasible. If not, I'll just
do option 2. Of course, let me know if I was mistaken about anything
or if there is another design which is better.

Thanks in advance.
Rafi

[1] 
http://mail-archives.apache.org/mod_mbox/kafka-users/201310.mbox/%3cff142f6b499ae34caed4d263f6ca32901d35a...@extxmb19.nam.nsroot.net%3E
[2] 
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
[3] https://issues.apache.org/jira/browse/KAFKA-1612
[4] https://issues.apache.org/jira/browse/KAFKA-966


no space left error

2015-01-06 Thread Sa Li
Hi, All

I am doing performance test on our new kafka production server, but after
sending some messages (even faked message by using bin/kafka-run-class.sh
org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error
of connection, and shut down the brokers, after that, I see such errors,

conf-su: cannot create temp file for here-document: No space left on device

How can I fix it, I am concerning that will happen when we start to publish
real messages in kafka, and should I create some cron to regularly clean
certain directories?

thanks

-- 

Alec Li


Re: no space left error

2015-01-06 Thread Otis Gospodnetic
Hi,

Your disk is full.  You should probably have something that checks/monitors
disk space and alerts you when it's full.

Maybe you can point Kafka to a different, larger disk or partition.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr  Elasticsearch Support * http://sematext.com/


On Tue, Jan 6, 2015 at 3:02 PM, Sa Li sal...@gmail.com wrote:

 Continue this issue, when I restart the server, like
 bin/kafka-server-start.sh config/server.properties

 it will fails to start the server, like

 [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable
 startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
 java.lang.InternalError: a fault occurred in a recent unsafe memory access
 operation in compiled Java code
 at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
 at
 kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
 at
 kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
 at
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at kafka.log.LogSegment.recover(LogSegment.scala:165)
 at kafka.log.Log.recoverLog(Log.scala:179)
 at kafka.log.Log.loadSegments(Log.scala:155)
 at kafka.log.Log.init(Log.scala:64)
 at

 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
 at

 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
 at
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
 at
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at kafka.log.LogManager.loadLogs(LogManager.scala:105)
 at kafka.log.LogManager.init(LogManager.scala:57)
 at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
 at
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
 at kafka.Kafka$.main(Kafka.scala:46)
 at kafka.Kafka.main(Kafka.scala)
 [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down
 (kafka.server.KafkaServer)
 [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread.
 (org.I0Itec.zkclient.ZkEventThread)
 [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed
 (org.apache.zookeeper.ZooKeeper)
 [2015-01-06 20:00:55,446] INFO EventThread shut down
 (org.apache.zookeeper.ClientCnxn)
 [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed
 (kafka.server.KafkaServer)
 [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down
 (kafka.server.KafkaServer)

 Any ideas

 On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote:

  the complete error message:
 
  -su: cannot create temp file for here-document: No space left on device
  OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory
  file:
 /tmp/hsperfdata_root/19721
  Try using the -Djava.io.tmpdir= option to select an alternate temp
  location.
  [2015-01-06 19:50:49,244] FATAL  (kafka.Kafka$)
  java.io.FileNotFoundException: conf (No such file or directory)
  at java.io.FileInputStream.open(Native Method)
  at java.io.FileInputStream.init(FileInputStream.java:146)
  at java.io.FileInputStream.init(FileInputStream.java:101)
  at kafka.utils.Utils$.loadProps(Utils.scala:144)
  at kafka.Kafka$.main(Kafka.scala:34)
  at kafka.Kafka.main(Kafka.scala)
 
  On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote:
 
 
  Hi, All
 
  I am doing performance test on our new kafka production server, but
 after
  sending some messages (even faked message by using
 bin/kafka-run-class.sh
  org.apache.kafka.clients.tools.ProducerPerformance), it comes out the
 error
  of connection, and shut down the brokers, after that, I see such errors,
 
  conf-su: cannot create temp file for here-document: No space left on
  device
 
  How can I fix it, I am concerning that will happen when we start to
  publish real messages in kafka, and should I create some cron to
 regularly
  clean certain directories?
 
  thanks
 
  --
 
  Alec Li
 
 
 
 
  --
 
  Alec Li
 



 --

 Alec Li



Re: no space left error

2015-01-06 Thread Sa Li
Continue this issue, when I restart the server, like
bin/kafka-server-start.sh config/server.properties

it will fails to start the server, like

[2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
java.lang.InternalError: a fault occurred in a recent unsafe memory access
operation in compiled Java code
at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at
kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
at
kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at kafka.log.LogSegment.recover(LogSegment.scala:165)
at kafka.log.Log.recoverLog(Log.scala:179)
at kafka.log.Log.loadSegments(Log.scala:155)
at kafka.log.Log.init(Log.scala:64)
at
kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
at
kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
at
kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
at
kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at kafka.log.LogManager.loadLogs(LogManager.scala:105)
at kafka.log.LogManager.init(LogManager.scala:57)
at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
at kafka.Kafka$.main(Kafka.scala:46)
at kafka.Kafka.main(Kafka.scala)
[2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down
(kafka.server.KafkaServer)
[2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed
(org.apache.zookeeper.ZooKeeper)
[2015-01-06 20:00:55,446] INFO EventThread shut down
(org.apache.zookeeper.ClientCnxn)
[2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed
(kafka.server.KafkaServer)
[2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down
(kafka.server.KafkaServer)

Any ideas

On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote:

 the complete error message:

 -su: cannot create temp file for here-document: No space left on device
 OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory
 file:
/tmp/hsperfdata_root/19721
 Try using the -Djava.io.tmpdir= option to select an alternate temp
 location.
 [2015-01-06 19:50:49,244] FATAL  (kafka.Kafka$)
 java.io.FileNotFoundException: conf (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at java.io.FileInputStream.init(FileInputStream.java:101)
 at kafka.utils.Utils$.loadProps(Utils.scala:144)
 at kafka.Kafka$.main(Kafka.scala:34)
 at kafka.Kafka.main(Kafka.scala)

 On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote:


 Hi, All

 I am doing performance test on our new kafka production server, but after
 sending some messages (even faked message by using bin/kafka-run-class.sh
 org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error
 of connection, and shut down the brokers, after that, I see such errors,

 conf-su: cannot create temp file for here-document: No space left on
 device

 How can I fix it, I am concerning that will happen when we start to
 publish real messages in kafka, and should I create some cron to regularly
 clean certain directories?

 thanks

 --

 Alec Li




 --

 Alec Li




-- 

Alec Li


Re: no space left error

2015-01-06 Thread David Birdsong
I'm keen to hear about how to work one's way out of a filled partition
since I've run into this many times after having tuned retention bytes or
retention (time?) incorrectly. The proper path to resolving this isn't
obvious based on my many harried searches through documentation.

I often end up stopping the particular broker, picking an unlucky
topic/partition, deleting, modifying the any topics that consumed too much
space by lowering their retention bytes, and restarting.

On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote:

 Continue this issue, when I restart the server, like
 bin/kafka-server-start.sh config/server.properties

 it will fails to start the server, like

 [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable
 startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
 java.lang.InternalError: a fault occurred in a recent unsafe memory access
 operation in compiled Java code
 at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
 at
 kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
 at
 kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
 at
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at kafka.log.LogSegment.recover(LogSegment.scala:165)
 at kafka.log.Log.recoverLog(Log.scala:179)
 at kafka.log.Log.loadSegments(Log.scala:155)
 at kafka.log.Log.init(Log.scala:64)
 at

 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
 at

 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
 at
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
 at
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at kafka.log.LogManager.loadLogs(LogManager.scala:105)
 at kafka.log.LogManager.init(LogManager.scala:57)
 at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
 at
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
 at kafka.Kafka$.main(Kafka.scala:46)
 at kafka.Kafka.main(Kafka.scala)
 [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down
 (kafka.server.KafkaServer)
 [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread.
 (org.I0Itec.zkclient.ZkEventThread)
 [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed
 (org.apache.zookeeper.ZooKeeper)
 [2015-01-06 20:00:55,446] INFO EventThread shut down
 (org.apache.zookeeper.ClientCnxn)
 [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed
 (kafka.server.KafkaServer)
 [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down
 (kafka.server.KafkaServer)

 Any ideas

 On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote:

  the complete error message:
 
  -su: cannot create temp file for here-document: No space left on device
  OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory
  file:
 /tmp/hsperfdata_root/19721
  Try using the -Djava.io.tmpdir= option to select an alternate temp
  location.
  [2015-01-06 19:50:49,244] FATAL  (kafka.Kafka$)
  java.io.FileNotFoundException: conf (No such file or directory)
  at java.io.FileInputStream.open(Native Method)
  at java.io.FileInputStream.init(FileInputStream.java:146)
  at java.io.FileInputStream.init(FileInputStream.java:101)
  at kafka.utils.Utils$.loadProps(Utils.scala:144)
  at kafka.Kafka$.main(Kafka.scala:34)
  at kafka.Kafka.main(Kafka.scala)
 
  On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote:
 
 
  Hi, All
 
  I am doing performance test on our new kafka production server, but
 after
  sending some messages (even faked message by using
 bin/kafka-run-class.sh
  org.apache.kafka.clients.tools.ProducerPerformance), it comes out the
 error
  of connection, and shut down the brokers, after that, I see such errors,
 
  conf-su: cannot create temp file for here-document: No space left on
  device
 
  How can I fix it, I am concerning that will happen when we start to
  publish real messages in kafka, and should I create some cron to
 regularly
  clean certain directories?
 
  thanks
 
  --
 
  Alec Li
 
 
 
 
  --
 
  Alec Li
 



 --

 Alec Li



Re: no space left error

2015-01-06 Thread Joe Stein
There are two parts to this

1) How to prevent Kafka from filling up disks which
https://issues.apache.org/jira/browse/KAFKA-1489 is trying to deal with (I
set the ticket to unassigned just now since i don't think anyone is working
on it and was assigned by default, could be wrong though so assign back if
I am wrong). I don't know the solution off the top of my head but I think
it is something we should strive for in 0.8.3 (worse case 0.9.0) as we need
a solution it happens frequently enough.

2) until then what to-do when it does happen

For #2 I have been pulled into the situation a number of times and honestly
the solution has been a bit different each time and not sure any one tool
or guideline is going to work honestly it is not always systematic... TBH
... but we could try some more guidelines and experiences that different
folks have had if someone already has this written up that would be great
otherwise I can carve out some time in the near future and do that (though
honestly I would rather effort go to #1 but it is a balance for sure.

For Sa's problem (where this thread started)  OpenJDK 64-Bit Server VM
warning: Insufficient space for shared memory file:
   /tmp/hsperfdata_root/19721

I don't think this is Kafka related though so what we are talking about
partition and retention is not applicable, even though important.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Tue, Jan 6, 2015 at 3:10 PM, David Birdsong david.birds...@gmail.com
wrote:

 I'm keen to hear about how to work one's way out of a filled partition
 since I've run into this many times after having tuned retention bytes or
 retention (time?) incorrectly. The proper path to resolving this isn't
 obvious based on my many harried searches through documentation.

 I often end up stopping the particular broker, picking an unlucky
 topic/partition, deleting, modifying the any topics that consumed too much
 space by lowering their retention bytes, and restarting.

 On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote:

  Continue this issue, when I restart the server, like
  bin/kafka-server-start.sh config/server.properties
 
  it will fails to start the server, like
 
  [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable
  startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
  java.lang.InternalError: a fault occurred in a recent unsafe memory
 access
  operation in compiled Java code
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at
  kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
  at
  kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
  at
  kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
  at
 kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
  at kafka.log.LogSegment.recover(LogSegment.scala:165)
  at kafka.log.Log.recoverLog(Log.scala:179)
  at kafka.log.Log.loadSegments(Log.scala:155)
  at kafka.log.Log.init(Log.scala:64)
  at
 
 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
  at
 
 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
  at
 
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
  at
 
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at kafka.log.LogManager.loadLogs(LogManager.scala:105)
  at kafka.log.LogManager.init(LogManager.scala:57)
  at
 kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
  at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
  at
  kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
  at kafka.Kafka$.main(Kafka.scala:46)
  at kafka.Kafka.main(Kafka.scala)
  [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down
  (kafka.server.KafkaServer)
  [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread.
  (org.I0Itec.zkclient.ZkEventThread)
  [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed
  (org.apache.zookeeper.ZooKeeper)
  [2015-01-06 20:00:55,446] INFO EventThread shut down
  (org.apache.zookeeper.ClientCnxn)
  [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed
  

Re: Version numbers

2015-01-06 Thread Jun Rao
The 0.8.2 branch is our release branch. We won't update any existing
released version and instead, will use a new version for every release. So
0.8.2-beta will be changed to 0.8.2.-beta-2 and eventually just 0.8.2.

Thanks,

Jun

On Sun, Jan 4, 2015 at 5:12 PM, Shannon Lloyd shanl...@gmail.com wrote:

 Is there a reason why the 0.8.2 branch (which I assume is essentially a
 release branch for the pending 0.8.2 release) has the version number
 statically set to 0.8.2-beta rather than a -SNAPSHOT version (e.g.
 0.8.2-SNAPSHOT or similar)? 0.8.2-beta should technically refer to the
 released beta, yes? Or are you using -beta here to effectively mean
 -SNAPSHOT and reserve the right to update the 0.8.2-beta tgz on the
 website? (Do you in fact update the version on the website?) The main
 problems with this are that tools like Maven/Gradle consider versions like
 0.8.2-beta to be immutable, so it's a PITA to get them to update in your
 IDE etc. And it just feels wrong having non-SNAPSHOT versions getting
 clobbered in my local m2 and Gradle caches.

 Cheers,
 Shannon



Re: no space left error

2015-01-06 Thread Sa Li
Thanks the reply, the disk is not full:

root@exemplary-birds:~# df -h
Filesystem  Size  Used Avail Use% Mounted on
/dev/sda2   133G  3.4G  123G   3% /
none4.0K 0  4.0K   0% /sys/fs/cgroup
udev 32G  4.0K   32G   1% /dev
tmpfs   6.3G  764K  6.3G   1% /run
none5.0M 0  5.0M   0% /run/lock
none 32G 0   32G   0% /run/shm
none100M 0  100M   0% /run/user
/dev/sdb114T   15G   14T   1% /srv

Neither the memory

root@exemplary-birds:~# free
 total   used   free sharedbuffers cached
Mem:  659633729698380   56264992776 1706687863812
-/+ buffers/cache:1663900   64299472
Swap:   997372  0 997372

thanks


On Tue, Jan 6, 2015 at 12:10 PM, David Birdsong david.birds...@gmail.com
wrote:

 I'm keen to hear about how to work one's way out of a filled partition
 since I've run into this many times after having tuned retention bytes or
 retention (time?) incorrectly. The proper path to resolving this isn't
 obvious based on my many harried searches through documentation.

 I often end up stopping the particular broker, picking an unlucky
 topic/partition, deleting, modifying the any topics that consumed too much
 space by lowering their retention bytes, and restarting.

 On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote:

  Continue this issue, when I restart the server, like
  bin/kafka-server-start.sh config/server.properties
 
  it will fails to start the server, like
 
  [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable
  startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
  java.lang.InternalError: a fault occurred in a recent unsafe memory
 access
  operation in compiled Java code
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at
  kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
  at
  kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
  at
  kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
  at
 kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
  at kafka.log.LogSegment.recover(LogSegment.scala:165)
  at kafka.log.Log.recoverLog(Log.scala:179)
  at kafka.log.Log.loadSegments(Log.scala:155)
  at kafka.log.Log.init(Log.scala:64)
  at
 
 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
  at
 
 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
  at
 
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
  at
 
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at kafka.log.LogManager.loadLogs(LogManager.scala:105)
  at kafka.log.LogManager.init(LogManager.scala:57)
  at
 kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
  at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
  at
  kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
  at kafka.Kafka$.main(Kafka.scala:46)
  at kafka.Kafka.main(Kafka.scala)
  [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down
  (kafka.server.KafkaServer)
  [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread.
  (org.I0Itec.zkclient.ZkEventThread)
  [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed
  (org.apache.zookeeper.ZooKeeper)
  [2015-01-06 20:00:55,446] INFO EventThread shut down
  (org.apache.zookeeper.ClientCnxn)
  [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed
  (kafka.server.KafkaServer)
  [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down
  (kafka.server.KafkaServer)
 
  Any ideas
 
  On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote:
 
   the complete error message:
  
   -su: cannot create temp file for here-document: No space left on device
   OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory
   file:
  /tmp/hsperfdata_root/19721
   Try using the -Djava.io.tmpdir= option to select an alternate temp
   location.
   [2015-01-06 19:50:49,244] FATAL  (kafka.Kafka$)
   java.io.FileNotFoundException: conf (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.init(FileInputStream.java:146)
   at java.io.FileInputStream.init(FileInputStream.java:101)
   at 

some connection errors happen in performance test

2015-01-06 Thread Sa Li
Hi, All

I am running performance test on kafka, the command

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test-rep-three 500 100 -1 acks=1 bootstrap.servers=
10.100.10.101:9092 buffer.memory=67108864 batch.size=8196

Since we send 50 billions to brokers, it was OK but periodically pop out
such errors:

[2015-01-06 19:38:32,127] WARN Error in I/O with exemplary-birds.master/
127.0.1.1 (org.apache.kafka.common.network.Selector)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafka.common.network.Selector.poll(Selector.java:232)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:745)
1950 records sent, 224.4 records/sec (0.02 MB/sec), 611.4 ms avg latency,
9259.0 max latency.
2899650 records sent, 579930.0 records/sec (55.31 MB/sec), 2399.5 ms avg
latency, 9505.0 max latency.
3170219 records sent, 634043.8 records/sec (60.47 MB/sec), 568.7 ms avg
latency, 1201.0 max latency.

And I feel the error happen more often, our kafka cluster is a three node
cluster.

thanks


-- 

Alec Li


config for consumer and producer

2015-01-06 Thread Sa Li
Hi, All

I am testing and making changes on server.properties, I wonder do I need to
specifically change the values in consumer and producer properties, here is
the consumer.properties

zookeeper.connect=10.100.98.100:2181,10.100.98.101:2181,10.100.98.102:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=100
#consumer group id
group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000

I use defaults for most of parameters, for group.id, it was defined as A
string that uniquely identifies the group of consumer processes to which
this consumer belongs. By setting the same group id multiple processes
indicate that they are all part of the same consumer group.

Do I need to define many consumer-group here?
For producer, we are not user java client, it is a C# client sending
message to kafka, so this producer won't be matter (except I am doing
producer test locally), right?

producer.type=sync
compression.codec=none

Thanks
-- 

Alec Li


Is it possible to enforce an unique constraint through Kafka?

2015-01-06 Thread Joseph Pachod
Hi

Having read a lot about kafka and its use at linkedin, I'm still unsure
whether Kafka can be used, with some mindset change for sure, as a general
purpose data store.

For example, would someone use Kafka to enforce an unique constraint?

A simple use case is, in the case of linkedin, unicity of users' login.

What would be you recommended implementation for such a need?

Thanks in advance

Best,
Joseph


MirrorMaker among different version Kafkas

2015-01-06 Thread Wei Yan
Hi, guys,

Want to confirm whether the mirrormaker supports different versions of
kafka. For example, if DC1 uses Kafka-0.8 and DC2 uses Kafka-0.7, does DC1
can mirrormake DC2’s messages? It looks this cannot work in my local test.

If not work, is there any candidate ways for replicating msgs among
different data centers running different version kafka?

thanks,

Wei


java.io.IOException: Connection reset by peer

2015-01-06 Thread Sa Li
Hi, All

I am running a C# producer to send messages to kafka (3 nodes cluster), but
have such errors:

[2015-01-06 16:09:51,143] ERROR Closing socket for /10.100.70.128 because
of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
[2015-01-06 16:09:51,144] ERROR Closing socket for /10.100.70.128 because
of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
[2015-01-06 16:09:56,138] INFO Closing socket connection to /10.100.70.28.
(kafka.network.Processor)
[2015-01-06 16:10:07,685] INFO Closing socket connection to /10.100.70.28.
(kafka.network.Processor)
[2015-01-06 16:10:31,423] INFO Closing socket connection to /10.100.70.28.
(kafka.network.Processor)
[2015-01-06 16:11:08,077] INFO Closing socket connection to /10.100.70.28.
(kafka.network.Processor)
[2015-01-06 16:11:43,990] INFO Closing socket connection to /10.100.70.28.
(kafka.network.Processor)
[2015-01-06 16:12:24,168] INFO Closing socket connection to /10.100.70.128.
(kafka.network.Processor)

But I do see messages in brokers. Any ideas?

thanks


-- 

Alec Li


Re: messages lost

2015-01-06 Thread Sa Li
Hi, experts

Again, we still having the issues of losing data, see we see data 5000
records, but only find 4500 records on brokers, we did set required.acks -1
to make sure all brokers ack, but that only cause the long latency, but not
cure the data lost.


thanks


On Mon, Jan 5, 2015 at 9:55 AM, Xiaoyu Wang xw...@rocketfuel.com wrote:

 @Sa,

 the required.acks is producer side configuration. Set to -1 means requiring
 ack from all brokers.

 On Fri, Jan 2, 2015 at 1:51 PM, Sa Li sal...@gmail.com wrote:

  Thanks a lot, Tim, this is the config of brokers
 
  --
  broker.id=1
  port=9092
  host.name=10.100.70.128
  num.network.threads=4
  num.io.threads=8
  socket.send.buffer.bytes=1048576
  socket.receive.buffer.bytes=1048576
  socket.request.max.bytes=104857600
  auto.leader.rebalance.enable=true
  auto.create.topics.enable=true
  default.replication.factor=3
 
  log.dirs=/tmp/kafka-logs-1
  num.partitions=8
 
  log.flush.interval.messages=1
  log.flush.interval.ms=1000
  log.retention.hours=168
  log.segment.bytes=536870912
  log.cleanup.interval.mins=1
 
  zookeeper.connect=10.100.70.128:2181,10.100.70.28:2181,10.100.70.29:2181
  zookeeper.connection.timeout.ms=100
 
  ---
 
 
  We actually play around request.required.acks in producer config, -1
 cause
  long latency, 1 is the parameter to cause messages lost. But I am not
 sure,
  if this is the reason to lose the records.
 
 
  thanks
 
  AL
 
 
 
 
 
 
 
  On Fri, Jan 2, 2015 at 9:59 AM, Timothy Chen tnac...@gmail.com wrote:
 
   What's your configured required.acks? And also are you waiting for all
   your messages to be acknowledged as well?
  
   The new producer returns futures back, but you still need to wait for
   the futures to complete.
  
   Tim
  
   On Fri, Jan 2, 2015 at 9:54 AM, Sa Li sal...@gmail.com wrote:
Hi, all
   
We are sending the message from a producer, we send 10 records,
 but
   we
see only 99573 records for that topics, we confirm this by consume
 this
topic and check the log size in kafka web console.
   
Any ideas for the message lost, what is the reason to cause this?
   
thanks
   
--
   
Alec Li
  
 
 
 
  --
 
  Alec Li
 




-- 

Alec Li


Re: messages lost

2015-01-06 Thread Joe Stein
You should never be storing your log files in /tmp please change that.

Ack = -1 is what you should be using if you want to guarantee messages are
saved. You should not be seeing high latencies (unless a few milliseconds
is high for you).

Are you using sync or async producer? What version of Kafka? How are you
counting the data from the topic? How are you counting you sent each
message and that it successfully acked? How are you counting from the topic
and have you verified the counts summed from each partition?

Can you share some sample code that reproduces this issue?

You can try counting the message from each partition using
https://github.com/edenhill/kafkacat and pipe to wc -l it makes for a nice
simple sanity check to where the problem might be.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Tue, Jan 6, 2015 at 7:21 PM, Sa Li sal...@gmail.com wrote:

 Hi, experts

 Again, we still having the issues of losing data, see we see data 5000
 records, but only find 4500 records on brokers, we did set required.acks -1
 to make sure all brokers ack, but that only cause the long latency, but not
 cure the data lost.


 thanks


 On Mon, Jan 5, 2015 at 9:55 AM, Xiaoyu Wang xw...@rocketfuel.com wrote:

  @Sa,
 
  the required.acks is producer side configuration. Set to -1 means
 requiring
  ack from all brokers.
 
  On Fri, Jan 2, 2015 at 1:51 PM, Sa Li sal...@gmail.com wrote:
 
   Thanks a lot, Tim, this is the config of brokers
  
   --
   broker.id=1
   port=9092
   host.name=10.100.70.128
   num.network.threads=4
   num.io.threads=8
   socket.send.buffer.bytes=1048576
   socket.receive.buffer.bytes=1048576
   socket.request.max.bytes=104857600
   auto.leader.rebalance.enable=true
   auto.create.topics.enable=true
   default.replication.factor=3
  
   log.dirs=/tmp/kafka-logs-1
   num.partitions=8
  
   log.flush.interval.messages=1
   log.flush.interval.ms=1000
   log.retention.hours=168
   log.segment.bytes=536870912
   log.cleanup.interval.mins=1
  
   zookeeper.connect=10.100.70.128:2181,10.100.70.28:2181,
 10.100.70.29:2181
   zookeeper.connection.timeout.ms=100
  
   ---
  
  
   We actually play around request.required.acks in producer config, -1
  cause
   long latency, 1 is the parameter to cause messages lost. But I am not
  sure,
   if this is the reason to lose the records.
  
  
   thanks
  
   AL
  
  
  
  
  
  
  
   On Fri, Jan 2, 2015 at 9:59 AM, Timothy Chen tnac...@gmail.com
 wrote:
  
What's your configured required.acks? And also are you waiting for
 all
your messages to be acknowledged as well?
   
The new producer returns futures back, but you still need to wait for
the futures to complete.
   
Tim
   
On Fri, Jan 2, 2015 at 9:54 AM, Sa Li sal...@gmail.com wrote:
 Hi, all

 We are sending the message from a producer, we send 10 records,
  but
we
 see only 99573 records for that topics, we confirm this by consume
  this
 topic and check the log size in kafka web console.

 Any ideas for the message lost, what is the reason to cause this?

 thanks

 --

 Alec Li
   
  
  
  
   --
  
   Alec Li
  
 



 --

 Alec Li



Re: no space left error

2015-01-06 Thread Sa Li
BTW, I found the the /kafka/logs also getting biger and bigger, like
controller.log and state-change.logs. should I launch a cron the clean them
up regularly or there is way to delete them regularly?

thanks

AL

On Tue, Jan 6, 2015 at 2:01 PM, Sa Li sal...@gmail.com wrote:

 Hi, All

 We fix the problem, I like to share the what the problem is in case
 someone come across the similar issues. We add the data drive for each node
 /dev/sdb1 , but specify the wrong path in server.properties, which means
 the data was written into the wrong drive /dev/sda2, quickly eat up all the
 space in sda2, now we change the path. The sdb1 has 15Tb, which allows us
 to store data for a while and will be deleted in 1/2 weeks as config
 mentioned.

 But I am kinda curious about David's comments,  ... after having tuned
 retention bytes or retention (time?) incorrectly. ..  How do you guys set
 log.retention.bytes?  I set log.retention.hours=336 (2 weeks), and should
 I set log.retention.bytes as default -1 or some other amount?

 thanks

 AL

 On Tue, Jan 6, 2015 at 12:43 PM, Sa Li sal...@gmail.com wrote:

 Thanks the reply, the disk is not full:

 root@exemplary-birds:~# df -h
 Filesystem  Size  Used Avail Use% Mounted on
 /dev/sda2   133G  3.4G  123G   3% /
 none4.0K 0  4.0K   0% /sys/fs/cgroup
 udev 32G  4.0K   32G   1% /dev
 tmpfs   6.3G  764K  6.3G   1% /run
 none5.0M 0  5.0M   0% /run/lock
 none 32G 0   32G   0% /run/shm
 none100M 0  100M   0% /run/user
 /dev/sdb114T   15G   14T   1% /srv

 Neither the memory

 root@exemplary-birds:~# free
  total   used   free sharedbuffers cached
 Mem:  659633729698380   56264992776 1706687863812
 -/+ buffers/cache:1663900   64299472
 Swap:   997372  0 997372

 thanks


 On Tue, Jan 6, 2015 at 12:10 PM, David Birdsong david.birds...@gmail.com
  wrote:

 I'm keen to hear about how to work one's way out of a filled partition
 since I've run into this many times after having tuned retention bytes or
 retention (time?) incorrectly. The proper path to resolving this isn't
 obvious based on my many harried searches through documentation.

 I often end up stopping the particular broker, picking an unlucky
 topic/partition, deleting, modifying the any topics that consumed too
 much
 space by lowering their retention bytes, and restarting.

 On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote:

  Continue this issue, when I restart the server, like
  bin/kafka-server-start.sh config/server.properties
 
  it will fails to start the server, like
 
  [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable
  startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
  java.lang.InternalError: a fault occurred in a recent unsafe memory
 access
  operation in compiled Java code
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at
  kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
  at
  kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
  at
 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
  at
 kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
  at kafka.log.LogSegment.recover(LogSegment.scala:165)
  at kafka.log.Log.recoverLog(Log.scala:179)
  at kafka.log.Log.loadSegments(Log.scala:155)
  at kafka.log.Log.init(Log.scala:64)
  at
 
 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
  at
 
 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
  at
 
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
  at
 
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at kafka.log.LogManager.loadLogs(LogManager.scala:105)
  at kafka.log.LogManager.init(LogManager.scala:57)
  at
 kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
  at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
  at
 
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
  at kafka.Kafka$.main(Kafka.scala:46)
  at kafka.Kafka.main(Kafka.scala)
  [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down
  (kafka.server.KafkaServer)
  [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread.
  

Re: no space left error

2015-01-06 Thread Sa Li
Hi, All

We fix the problem, I like to share the what the problem is in case someone
come across the similar issues. We add the data drive for each node
/dev/sdb1 , but specify the wrong path in server.properties, which means
the data was written into the wrong drive /dev/sda2, quickly eat up all the
space in sda2, now we change the path. The sdb1 has 15Tb, which allows us
to store data for a while and will be deleted in 1/2 weeks as config
mentioned.

But I am kinda curious about David's comments,  ... after having tuned
retention bytes or retention (time?) incorrectly. ..  How do you guys set
log.retention.bytes?  I set log.retention.hours=336 (2 weeks), and should I
set log.retention.bytes as default -1 or some other amount?

thanks

AL

On Tue, Jan 6, 2015 at 12:43 PM, Sa Li sal...@gmail.com wrote:

 Thanks the reply, the disk is not full:

 root@exemplary-birds:~# df -h
 Filesystem  Size  Used Avail Use% Mounted on
 /dev/sda2   133G  3.4G  123G   3% /
 none4.0K 0  4.0K   0% /sys/fs/cgroup
 udev 32G  4.0K   32G   1% /dev
 tmpfs   6.3G  764K  6.3G   1% /run
 none5.0M 0  5.0M   0% /run/lock
 none 32G 0   32G   0% /run/shm
 none100M 0  100M   0% /run/user
 /dev/sdb114T   15G   14T   1% /srv

 Neither the memory

 root@exemplary-birds:~# free
  total   used   free sharedbuffers cached
 Mem:  659633729698380   56264992776 1706687863812
 -/+ buffers/cache:1663900   64299472
 Swap:   997372  0 997372

 thanks


 On Tue, Jan 6, 2015 at 12:10 PM, David Birdsong david.birds...@gmail.com
 wrote:

 I'm keen to hear about how to work one's way out of a filled partition
 since I've run into this many times after having tuned retention bytes or
 retention (time?) incorrectly. The proper path to resolving this isn't
 obvious based on my many harried searches through documentation.

 I often end up stopping the particular broker, picking an unlucky
 topic/partition, deleting, modifying the any topics that consumed too much
 space by lowering their retention bytes, and restarting.

 On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote:

  Continue this issue, when I restart the server, like
  bin/kafka-server-start.sh config/server.properties
 
  it will fails to start the server, like
 
  [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable
  startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
  java.lang.InternalError: a fault occurred in a recent unsafe memory
 access
  operation in compiled Java code
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at
  kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
  at
  kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
  at
  kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
  at
 kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
  at kafka.log.LogSegment.recover(LogSegment.scala:165)
  at kafka.log.Log.recoverLog(Log.scala:179)
  at kafka.log.Log.loadSegments(Log.scala:155)
  at kafka.log.Log.init(Log.scala:64)
  at
 
 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
  at
 
 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
  at
 
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
  at
 
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at kafka.log.LogManager.loadLogs(LogManager.scala:105)
  at kafka.log.LogManager.init(LogManager.scala:57)
  at
 kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
  at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
  at
  kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
  at kafka.Kafka$.main(Kafka.scala:46)
  at kafka.Kafka.main(Kafka.scala)
  [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down
  (kafka.server.KafkaServer)
  [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread.
  (org.I0Itec.zkclient.ZkEventThread)
  [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed
  (org.apache.zookeeper.ZooKeeper)
  [2015-01-06 20:00:55,446] INFO EventThread shut down
  (org.apache.zookeeper.ClientCnxn)
  [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed
  

Re: no space left error

2015-01-06 Thread Sa Li
the complete error message:

-su: cannot create temp file for here-document: No space left on device
OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file:
   /tmp/hsperfdata_root/19721
Try using the -Djava.io.tmpdir= option to select an alternate temp location.
[2015-01-06 19:50:49,244] FATAL  (kafka.Kafka$)
java.io.FileNotFoundException: conf (No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at java.io.FileInputStream.init(FileInputStream.java:101)
at kafka.utils.Utils$.loadProps(Utils.scala:144)
at kafka.Kafka$.main(Kafka.scala:34)
at kafka.Kafka.main(Kafka.scala)

On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote:


 Hi, All

 I am doing performance test on our new kafka production server, but after
 sending some messages (even faked message by using bin/kafka-run-class.sh
 org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error
 of connection, and shut down the brokers, after that, I see such errors,

 conf-su: cannot create temp file for here-document: No space left on device

 How can I fix it, I am concerning that will happen when we start to
 publish real messages in kafka, and should I create some cron to regularly
 clean certain directories?

 thanks

 --

 Alec Li




-- 

Alec Li


Topic in bad state after controller to broker messaging fails

2015-01-06 Thread Henri Pihkala
Hi,

I’m hitting a strange problem using 0.8.2-beta and just a single kafka broker 
on CentOS 6.5. 

A percentage of my topic create attempts are randomly failing and leaving the 
new topic in a state in which it can not be used due to “partition doesn’t 
exist” errors as seen in server.log below.

In controller.log, it looks like the controller fails to send either the 
become-leader LeaderAndIsr request” or the UpdateMetadata request” to the 
broker (which in fact is the same Kafka instance), due to a socket read failing 
(for unknown reason).

My questions:

(1) Is the bad topic state a result of the message not making it from the 
controller to the broker?

(2) Any idea why the socket read might randomly fail? It can’t be a network 
issue since we’re running a single instance.

(3) Shouldn’t the controller try to resend the message?



controller.log

[2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback for 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController)

[2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation callback 
for [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController)

[2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]: 
Invoking state change to NewPartition for partitions 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] 
(kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]: 
Invoking state change to NewReplica for replicas 
[Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] 
(kafka.controller.ReplicaStateMachine)

[2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]: 
Invoking state change to OnlinePartition for partitions 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] 
(kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]: Live 
assigned replicas for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: 
[List(0)] (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]: 
Initializing leader and isr for partition 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] to 
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) 
(kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]: 
Invoking state change to OnlineReplica for replicas 
[Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] 
(kafka.controller.ReplicaStateMachine)

[2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread], 
Controller 0 fails to send a request to broker id:0,host:dev.unifina,port:9092 
(kafka.controller.RequestSendThread)
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at kafka.utils.Utils$.read(Utils.scala:381)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread], 
Controller 0 epoch 2 failed to send request 
Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0]
 - 
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0)
 to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker. 
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)




state-change.log

[2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to 
NewPartition with assigned replicas 0 (state.change.logger)

[2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of replica 0 
for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NonExistentReplica 
to NewReplica (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewPartition to OnlinePartition 
with leader 0 (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 

RE: Is it possible to enforce an unique constraint through Kafka?

2015-01-06 Thread Todd Hughes
Log compaction though allows it to work as a data store quite well for some use 
cases .  It's exactly why I started looking hard at Kafka lately.

The general idea is quite simple. Rather than maintaining only recent 
log entries in the log and throwing away old log segments we maintain 
the most recent entry for each unique key. This ensures that the log 
contains a complete dataset and can be used for reloading key-based 
state.
https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction

 Date: Tue, 6 Jan 2015 16:34:06 -0800
 Subject: Re: Is it possible to enforce an unique constraint through Kafka?
 From: wiz...@gmail.com
 To: users@kafka.apache.org
 
 Kafka is more of a message queue than a data store. You can use it to store
 history of the queue (certainly a powerful use case for disaster recovery),
 but it's still not really a data store.
 
 From the Kafka website (kafka.apache.org):
 Apache Kafka is a publish-subscribe messaging [queue] rethought as a
 distributed commit log.
 
 -Mark
 
 On Tue, Jan 6, 2015 at 3:14 PM, Joseph Pachod joseph.pac...@gmail.com
 wrote:
 
  Hi
 
  Having read a lot about kafka and its use at linkedin, I'm still unsure
  whether Kafka can be used, with some mindset change for sure, as a general
  purpose data store.
 
  For example, would someone use Kafka to enforce an unique constraint?
 
  A simple use case is, in the case of linkedin, unicity of users' login.
 
  What would be you recommended implementation for such a need?
 
  Thanks in advance
 
  Best,
  Joseph
 
  

[Kafka-users] Differences MessageIn between two kafka clusters replicated by mirror maker

2015-01-06 Thread tao li
Hi,

We are using mirrormaker to replicate data between two kafka clusters in
difference datacenter, a fiber connected them, 1G bandwidth, ~35ms
lantency.

we found that the MessageIn in JMX beans with the broker in destination
cluster greater than broker in source cluster.

Our configuration like this:
destination  source cluster both deployed kafka 0.7.0
source clusters has 6 brokers with 32 partitions each,target clusters has
12 brokers with 32 partitions each and 6 mirrormakers doing
replication,replicate only one topic.

mirrormaker configuration like this:

--num.streams 128
--num.producers 16

=

consumer.properties:

shallowiterator.enable=true

socket.buffersize=31457280

fetch.size=1048576

=

producer.properties:

queue.enqueueTimeout.ms http://queue.enqueuetimeout.ms/=-1

# the number of messages batched at the producer

batch.size=4000

#default 5000

connect.timeout.ms=1

#default 3

socket.timeout.ms=30

==

test result:

MessageIn: source cluster = 252,560,169, target cluster = 253,128,669

another problem, sometimes one or more broker in source clusters has not be
consumered during the mirrormakers replicate other brokers well.

Does anyone have an idea to resolve them.


Re: latency - how to reduce?

2015-01-06 Thread Jayesh Thakrar
Have you tried using the built-in stress test scripts?
bin/kafka-producer-perf-test.sh
bin/kafka-consumer-perf-test.sh

Here's how I stress tested them -
nohup ${KAFKA_HOME}/bin/kafka-producer-perf-test.sh --broker-list 
${KAFKA_SERVERS} --topic ${TOPIC_NAME} --new-producer --threads 16 --messages 
1 1kafka-producer-perf-test.sh.log 21  

nohup ${KAFKA_HOME}/bin/kafka-consumer-perf-test.sh --zookeeper 
${ZOOKEEPER_QUORUM} --topic ${TOPIC_NAME} --threads 16 
1kafka-consumer-perf-test.sh.log 21  

And I used screen scrapping of the jmx ui screens to push metrics into TSDB to 
get the following.The rate below is per second - so I could push the Kafka 
cluster to 140k+ messages/sec on a 4-node cluster with very little utilization 
(30% utilization). 


  From: Shlomi Hazan shl...@viber.com
 To: users@kafka.apache.org 
 Sent: Tuesday, January 6, 2015 1:06 AM
 Subject: Re: latency - how to reduce?
   
Will do. What did you have in mind? just write a big file to disk and
measure the time it took to write? maybe also read back? using specific
API's?
Apart from the local Win machine case, are you aware of any issues with
Amazon EC2 instances that may be causing that same latency in production?
Thanks,
Shlomi



On Tue, Jan 6, 2015 at 4:04 AM, Jun Rao j...@confluent.io wrote:

 Not setting log.flush.interval.messages is good since the default gives
 the best latency. Could you do some basic I/O testing on the local FS in
 your windows machine to make sure the I/O latency is ok?

 Thanks,

 Jun

 On Thu, Jan 1, 2015 at 1:40 AM, Shlomi Hazan shl...@viber.com wrote:

  Happy new year!
  I did not set log.flush.interval.messages.
  I also could not find a default value in the docs.
  Could you explain about that?
  Thanks,
  Shlomi
 
  On Thu, Jan 1, 2015 at 2:20 AM, Jun Rao j...@confluent.io wrote:
 
   What's your setting of log.flush.interval.messages on the broker?
  
   Thanks,
  
   Jun
  
   On Mon, Dec 29, 2014 at 3:26 AM, Shlomi Hazan shl...@viber.com
 wrote:
  
Hi,
I am using 0.8.1.1, and I have hundreds of msec latency at best and
  even
seconds at worst.
I have this latency both on production, (with peak load of 30K
 msg/sec,
replication = 2 across 5 brokers, acks = 1),
and on the local windows machine using just one process for each of
producer, zookeeper, kafka, consumer.
Also tried batch.num.messages=1 and producer.type=sync on the local
   machine
but saw no improvement.
How can I push latency down to several millis, at least when running
   local?
Thanks,
Shlomi
   
  
 



  

New 0.8.2 client compatibility with 0.8.1.1 during failure cases

2015-01-06 Thread Paul Pearcy
Hello,
  I have some of my own test cases very similar to the ones
in ProducerFailureHandlingTest.

I moved to the new producer against 0.8.1.1 and all of my test cases around
disconnects fail. Moving to use 0.8.2-beta on server side things succeed.

Here is an example test:
- Healthy producer/server setup
- Stop the server
- Send a message
- Call get on the future and it never returns. Doesn't matter if the server
is started again or remains stopped

So, I believe that while 0.8.2 producer is compatible with 0.8.1.1 in the
happy case, but fails to return futures in cases where disconnects have
occurred.

Is it possible to run the test cases in ProducerFailureHandlingTest.scala
against 0.8.1.1?

Thanks,
Paul


Re: messages lost

2015-01-06 Thread Mayuresh Gharat
Try doing .get() on the future returned by the new producer. It should
guarantee that message has made to kafka.

Thanks,

Mayuresh

On Tue, Jan 6, 2015 at 4:21 PM, Sa Li sal...@gmail.com wrote:

 Hi, experts

 Again, we still having the issues of losing data, see we see data 5000
 records, but only find 4500 records on brokers, we did set required.acks -1
 to make sure all brokers ack, but that only cause the long latency, but not
 cure the data lost.


 thanks


 On Mon, Jan 5, 2015 at 9:55 AM, Xiaoyu Wang xw...@rocketfuel.com wrote:

  @Sa,
 
  the required.acks is producer side configuration. Set to -1 means
 requiring
  ack from all brokers.
 
  On Fri, Jan 2, 2015 at 1:51 PM, Sa Li sal...@gmail.com wrote:
 
   Thanks a lot, Tim, this is the config of brokers
  
   --
   broker.id=1
   port=9092
   host.name=10.100.70.128
   num.network.threads=4
   num.io.threads=8
   socket.send.buffer.bytes=1048576
   socket.receive.buffer.bytes=1048576
   socket.request.max.bytes=104857600
   auto.leader.rebalance.enable=true
   auto.create.topics.enable=true
   default.replication.factor=3
  
   log.dirs=/tmp/kafka-logs-1
   num.partitions=8
  
   log.flush.interval.messages=1
   log.flush.interval.ms=1000
   log.retention.hours=168
   log.segment.bytes=536870912
   log.cleanup.interval.mins=1
  
   zookeeper.connect=10.100.70.128:2181,10.100.70.28:2181,
 10.100.70.29:2181
   zookeeper.connection.timeout.ms=100
  
   ---
  
  
   We actually play around request.required.acks in producer config, -1
  cause
   long latency, 1 is the parameter to cause messages lost. But I am not
  sure,
   if this is the reason to lose the records.
  
  
   thanks
  
   AL
  
  
  
  
  
  
  
   On Fri, Jan 2, 2015 at 9:59 AM, Timothy Chen tnac...@gmail.com
 wrote:
  
What's your configured required.acks? And also are you waiting for
 all
your messages to be acknowledged as well?
   
The new producer returns futures back, but you still need to wait for
the futures to complete.
   
Tim
   
On Fri, Jan 2, 2015 at 9:54 AM, Sa Li sal...@gmail.com wrote:
 Hi, all

 We are sending the message from a producer, we send 10 records,
  but
we
 see only 99573 records for that topics, we confirm this by consume
  this
 topic and check the log size in kafka web console.

 Any ideas for the message lost, what is the reason to cause this?

 thanks

 --

 Alec Li
   
  
  
  
   --
  
   Alec Li
  
 



 --

 Alec Li




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: mirrormaker tool in 0.82beta

2015-01-06 Thread Jun Rao
Did you set offsets.storage to kafka in the consumer of mirror maker?

Thanks,

Jun

On Mon, Jan 5, 2015 at 3:49 PM, svante karlsson s...@csi.se wrote:

 I'm using 0.82beta and I'm trying to push data with the mirrormaker tool
 from several remote sites to two datacenters. I'm testing this from a node
 containing zk, broker and mirrormaker and the data is pushed to a normal
 cluster. 3 zk and 4 brokers with replication.

 While the configuration seems straight forward things are a bit shaky.

 First, if something goes wrong on either consumer or producer side it stops
 transmitting data without dying which makes it harder to run under a
 supervisor (upstart in my case)

 I start pushing data to site 02 and mirrormaker pushes it to my datacenter
 - so far all good...

 But, when I run. (on my remote site)
 kafka-consumer-offset-checker.sh
 --group mirrormaker.from.site_02.to.site_ktv --zookeeper 192.168.0.106

 Could not fetch offset for [saka.test.ext_datastream,7] due to
 kafka.common.NotCoordinatorForConsumerException.
 Could not fetch offset for [saka.test.ext_datastream,4] due to
 kafka.common.NotCoordinatorForConsumerException.
 Could not fetch offset for [saka.test.ext_datastream,2] due to
 kafka.common.NotCoordinatorForConsumerException.
 Could not fetch offset for [saka.test.ext_datastream,5] due to
 kafka.common.NotCoordinatorForConsumerException.
 Could not fetch offset for [saka.test.ext_datastream,6] due to
 kafka.common.NotCoordinatorForConsumerException.
 Could not fetch offset for [saka.test.ext_datastream,3] due to
 kafka.common.NotCoordinatorForConsumerException.
 Could not fetch offset for [saka.test.ext_datastream,1] due to
 kafka.common.NotCoordinatorForConsumerException.
 Could not fetch offset for [saka.test.ext_datastream,0] due to
 kafka.common.NotCoordinatorForConsumerException.

 Group   Topic  Pid Offset  logSize
 Lag Owner
 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   0
 unknown 9310560 unknown
 mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   1
 unknown 9313497 unknown
 mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   2
 unknown 9323623 unknown
 mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   3
 unknown 9334005 unknown
 mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   4
 unknown 9324176 unknown
 mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   5
 unknown 9336504 unknown
 mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   6
 unknown 9316139 unknown
 mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   7
 unknown 9318770 unknown
 mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0


 Since I can push data between my sites something is working but what can be
 the case for NotCoordinatorForConsumerException on a single node cluster?


 Finally the __consumer_offsets topic should probably be masked out from
 mirroring without using whitelists or blacklists


 thanks
 /svante



Re: The message would be committed if message un-committed

2015-01-06 Thread Jun Rao
The consumer always fetches data from the leader broker.

Thanks,

Jun

On Tue, Jan 6, 2015 at 2:50 AM, chenlax lax...@hotmail.com wrote:

 as the kafka doc  Only committed messages are ever given out to the
 consumer. .
 if followers does not copy the message on time and followers are in ISR,
 consumers would consume the message from leader broker?

 Thanks,
 Lax



Re: kafka deleted old logs but not released

2015-01-06 Thread Jun Rao
Do you mean that the Kafka broker still holds a file handler on a deleted
file? Do you see those files being deleted in the Kafka log4j log?

Thanks,

Jun

On Tue, Jan 6, 2015 at 4:46 AM, Yonghui Zhao zhaoyong...@gmail.com wrote:

 Hi,

 We use kafka_2.10-0.8.1.1 in our server. Today we found disk space alert.

 We find many kafka data files are deleted, but still opened by kafka.

 such as:

 _yellowpageV2-0/68170670.log (deleted)
 java   8446 root  724u  REG  253,2 536937911
 26087362

 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/68818668.log
 (deleted)
 java   8446 root  725u  REG  253,2 536910838
 26087364

 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/69457098.log
 (deleted)
 java   8446 root  726u  REG  253,2 536917902
 26087368

 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/70104914.log
 (deleted)


 Is there anything wrong or wrong configed?



Re: [Kafka-users] Differences MessageIn between two kafka clusters replicated by mirror maker

2015-01-06 Thread Jun Rao
Not sure exactly what's happening there. In any case, 0.7 is really old.
You probably want to upgrade to the latest 0.8 release.

Thanks,

Jun

On Tue, Jan 6, 2015 at 12:16 AM, tao li tust.05102...@gmail.com wrote:

 Hi,

 We are using mirrormaker to replicate data between two kafka clusters in
 difference datacenter, a fiber connected them, 1G bandwidth, ~35ms
 lantency.

 we found that the MessageIn in JMX beans with the broker in destination
 cluster greater than broker in source cluster.

 Our configuration like this:
 destination  source cluster both deployed kafka 0.7.0
 source clusters has 6 brokers with 32 partitions each,target clusters has
 12 brokers with 32 partitions each and 6 mirrormakers doing
 replication,replicate only one topic.

 mirrormaker configuration like this:

 --num.streams 128
 --num.producers 16

 =

 consumer.properties:

 shallowiterator.enable=true

 socket.buffersize=31457280

 fetch.size=1048576

 =

 producer.properties:

 queue.enqueueTimeout.ms http://queue.enqueuetimeout.ms/=-1

 # the number of messages batched at the producer

 batch.size=4000

 #default 5000

 connect.timeout.ms=1

 #default 3

 socket.timeout.ms=30

 ==

 test result:

 MessageIn: source cluster = 252,560,169, target cluster = 253,128,669

 another problem, sometimes one or more broker in source clusters has not be
 consumered during the mirrormakers replicate other brokers well.

 Does anyone have an idea to resolve them.



Re: Zookeeper Connection When Using High Level Sample Consumer Code from Wiki

2015-01-06 Thread Jayesh Thakrar
Ok, I could make the example work.
The problem was that on the wiki page, there was no value for 
zk.connectiontimeout.ms.I inserted a value for that property (value = 10) 
and the example worked.
Also, I realized my mail client messed up the earlier message.
Here's the URL for the wiki page I was referring to - 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

  From: Jayesh Thakrar j_thak...@yahoo.com.INVALID
 To: users@kafka.apache.org users@kafka.apache.org 
 Sent: Tuesday, January 6, 2015 11:09 AM
 Subject: Zookeeper Connection When Using High Level Sample Consumer Code from 
Wiki
   
When I try running the Java Consumer example at 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+ExampleI get 
the following zookeeper connection error.
I have verified zookeeper connectivity using a  variety fo means (using 
Zookeeper built-in client, sending 4-letter commands to the zk port, etc.) - so 
I know the zookeeper info is correct.
Here's the error I get. Any pointers will be greatly appreciated.
 Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: 
Unable to connect to zookeeper server within timeout: 400 at 
org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880) at 
org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at 
org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at 
kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:170)
 at 
kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:126)
 at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:66)
 at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:69)
 at 
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
 at 
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) at 
kafka_sample.Consumer.init(Consumer.java:21) at 
kafka_sample.Consumer.main(Consumer.java:71)

  

Re: The message would be consumed if message un-committed

2015-01-06 Thread Mayuresh Gharat
Only Committed messages are made available to the consumer. The consumer
will always read from the leader. The message is said to be committed (The
high water mark is advanced)  only when all the replicas in the ISR have
received it.

Thanks,

Mayuresh

On Tue, Jan 6, 2015 at 3:41 AM, chenlax lax...@hotmail.com wrote:

  as the kafka doc  Only committed messages are ever given out to the
 consumer. .
 if followers does not copy the message on time and followers are in ISR,
 consumers would consume the message from leader broker?

 Thanks,
 Lax





-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Release Date for Kafka 0.8.2

2015-01-06 Thread Neha Narkhede
You can track the blockers here
https://issues.apache.org/jira/browse/KAFKA-1841?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.2%20AND%20resolution%20%3D%20Unresolved%20AND%20priority%20%3D%20Blocker%20ORDER%20BY%20key%20DESC.
We are waiting on follow up patches for 2 JIRAs which are under review. We
should be able to release 0.8.2 in a few days.



On Mon, Jan 5, 2015 at 10:26 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi Srividhya,

 See

 http://search-hadoop.com/m/4TaT4B9tys1/subj=Re+Kafka+0+8+2+release+before+Santa+Claus

 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/


 On Mon, Jan 5, 2015 at 11:55 AM, Srividhya Shanmugam 
 srividhyashanmu...@fico.com wrote:

  Kafka Team,
 
  We are currently using the 0.8.2 beta version with a patch for
 KAFKA-1738.
  Do you have any updates on when 0.8.2 final version will be released?
 
  Thanks,
  Srividhya
 
 
  This email and any files transmitted with it are confidential,
 proprietary
  and intended solely for the individual or entity to whom they are
  addressed. If you have received this email in error please delete it
  immediately.
 




-- 
Thanks,
Neha


Re: New 0.8.2 client compatibility with 0.8.1.1 during failure cases

2015-01-06 Thread Ewen Cheslack-Postava
Paul,

That behavior is currently expected, see
https://issues.apache.org/jira/browse/KAFKA-1788. There are currently no
client-side timeouts in the new producer, so the message just sits there
forever waiting for the server to come back so it can try to send it.

If you already have tests for a variety of failure modes it might be
helpful to contribute them to the WIP patch in that JIRA. Currently the
patch only adds one unit test for RecordAccumulator, a few tests using the
full server would be an improvement.

-Ewen

On Tue, Jan 6, 2015 at 7:54 AM, Paul Pearcy ppea...@gmail.com wrote:

 Hello,
   I have some of my own test cases very similar to the ones
 in ProducerFailureHandlingTest.

 I moved to the new producer against 0.8.1.1 and all of my test cases around
 disconnects fail. Moving to use 0.8.2-beta on server side things succeed.

 Here is an example test:
 - Healthy producer/server setup
 - Stop the server
 - Send a message
 - Call get on the future and it never returns. Doesn't matter if the server
 is started again or remains stopped

 So, I believe that while 0.8.2 producer is compatible with 0.8.1.1 in the
 happy case, but fails to return futures in cases where disconnects have
 occurred.

 Is it possible to run the test cases in ProducerFailureHandlingTest.scala
 against 0.8.1.1?

 Thanks,
 Paul




-- 
Thanks,
Ewen