Re: Kafka restart takes a long time
Thanks every one. I"ll try to clean up the disk space and try again. On Sun, Nov 23, 2014 at 8:47 AM, Jason Rosenberg wrote: > Rajiv, > > So, any time a broker's disk fills up, it will shut itself down immediately > (it will do this in response to any IO error on writing to disk). > Unfortunately, this means that the node will not be able to do any > housecleaning before shutdown, which is an 'unclean' shutdown. This means > that when it restarts, it needs to reset the data to the last known > checkpoint. If the partition is replicated, and it can restore it from > another broker, it will try to do that (but it doesn't sound like it can do > that in your case, since all the other nodes are down too). > > There is a fix coming in 0.8.2 that will allow a broker to restore multiple > partitions in parallel (but the current behavior in 0.8.1.1 and prior is to > restore partitions 1 by 1). See: > https://issues.apache.org/jira/browse/KAFKA-1414. This fix should speed > things up greatly when you have a large number of partitions. > > If a disk is full, the broker will refuse to even start up (or will fail > immediately on the first write attempt and shut itself down). So, > generally, in this event, you need to clear some disk space before trying > to restart the server. > > The bottom line is that you don't want any of your brokers to run out of > disk space (thus you need to have good monitoring/alerting for advance > warning on this). Kafka doesn't attempt to detect if it's about to run out > of space and die, so you have to manage that and guard against it outside > of kafka. > > Jason > > On Sat, Nov 22, 2014 at 5:27 PM, Harsha wrote: > > > It might logs check your kafka logs dir (server logs) . Kafka can > > produce lot of logs in a quick time make sure thats whats in play here. > > -Harsha > > > > On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote: > > > Actually see a bunch of errors. One of the brokers is out of space and > > > this > > > might be causing everything to spin out of control. > > > > > > Some logs: > > > > > > On *broker 1* (the one that has run out of space): > > > > > > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13 ] > > > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-1-13], > Disk > > > error while replicating data. > > > > > > kafka.common.KafkaStorageException: I/O exception in append to log > > > 'mytopic-633' > > > > > > at kafka.log.Log.append(Log.scala:283) > > > > > > at > > > > > > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > > > > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > > > at kafka.utils.Utils$.inLock(Utils.scala:538) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > > > > > > at > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > > > at > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > Caused by: java.io.IOException: No space left on device > > > > > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > > > > > > at > > > sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) > > > > > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > > > > > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > > > > > > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) > > > > > > at > > > > > > kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132) > > > > > > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210) > > > > > > at kafka.log.LogSegment.append(LogSegment.scala:80) > > > > > > at kafka.log.Log.append(Log.scala:269) > > > > > > ... 13 more > > > > > > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13 ] > > > [kafka.server.ReplicaFetc
Re: Kafka restart takes a long time
Rajiv, So, any time a broker's disk fills up, it will shut itself down immediately (it will do this in response to any IO error on writing to disk). Unfortunately, this means that the node will not be able to do any housecleaning before shutdown, which is an 'unclean' shutdown. This means that when it restarts, it needs to reset the data to the last known checkpoint. If the partition is replicated, and it can restore it from another broker, it will try to do that (but it doesn't sound like it can do that in your case, since all the other nodes are down too). There is a fix coming in 0.8.2 that will allow a broker to restore multiple partitions in parallel (but the current behavior in 0.8.1.1 and prior is to restore partitions 1 by 1). See: https://issues.apache.org/jira/browse/KAFKA-1414. This fix should speed things up greatly when you have a large number of partitions. If a disk is full, the broker will refuse to even start up (or will fail immediately on the first write attempt and shut itself down). So, generally, in this event, you need to clear some disk space before trying to restart the server. The bottom line is that you don't want any of your brokers to run out of disk space (thus you need to have good monitoring/alerting for advance warning on this). Kafka doesn't attempt to detect if it's about to run out of space and die, so you have to manage that and guard against it outside of kafka. Jason On Sat, Nov 22, 2014 at 5:27 PM, Harsha wrote: > It might logs check your kafka logs dir (server logs) . Kafka can > produce lot of logs in a quick time make sure thats whats in play here. > -Harsha > > On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote: > > Actually see a bunch of errors. One of the brokers is out of space and > > this > > might be causing everything to spin out of control. > > > > Some logs: > > > > On *broker 1* (the one that has run out of space): > > > > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13 ] > > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-1-13], Disk > > error while replicating data. > > > > kafka.common.KafkaStorageException: I/O exception in append to log > > 'mytopic-633' > > > > at kafka.log.Log.append(Log.scala:283) > > > > at > > > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > > > > at > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > > > at > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > at > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > at kafka.utils.Utils$.inLock(Utils.scala:538) > > > > at > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > > > > at > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > at > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > Caused by: java.io.IOException: No space left on device > > > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > > > > at > > sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) > > > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > > > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > > > > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) > > > > at > > > kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132) > > > > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210) > > > > at kafka.log.LogSegment.append(LogSegment.scala:80) > > > > at kafka.log.Log.append(Log.scala:269) > > > > ... 13 more > > > > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13 ] > > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-2-13], > > Error > > getting offset for partition [myTopic,0] to broker 13 > > > > java.io.IOException: No space left on device > > > > at java.io.FileOutputStream.writeBytes(Native Method) > > > > at java.io.FileOutputStream.write(FileOutputStream.java:345) > > > > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > > >
Re: Kafka restart takes a long time
It might logs check your kafka logs dir (server logs) . Kafka can produce lot of logs in a quick time make sure thats whats in play here. -Harsha On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote: > Actually see a bunch of errors. One of the brokers is out of space and > this > might be causing everything to spin out of control. > > Some logs: > > On *broker 1* (the one that has run out of space): > > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13 ] > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-1-13], Disk > error while replicating data. > > kafka.common.KafkaStorageException: I/O exception in append to log > 'mytopic-633' > > at kafka.log.Log.append(Log.scala:283) > > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > at kafka.utils.Utils$.inLock(Utils.scala:538) > > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > Caused by: java.io.IOException: No space left on device > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > > at > sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) > > at > kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132) > > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210) > > at kafka.log.LogSegment.append(LogSegment.scala:80) > > at kafka.log.Log.append(Log.scala:269) > > ... 13 more > > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13 ] > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-2-13], > Error > getting offset for partition [myTopic,0] to broker 13 > > java.io.IOException: No space left on device > > at java.io.FileOutputStream.writeBytes(Native Method) > > at java.io.FileOutputStream.write(FileOutputStream.java:345) > > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > > at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > > at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > > at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) > > at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129) > > at java.io.BufferedWriter.write(BufferedWriter.java:230) > > at java.io.Writer.write(Writer.java:157) > > at > kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:50) > > at > kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:49) > > at > scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106) > > at > scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106) > > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > at > scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:106) > > at > kafka.server.OffsetCheckpoint.liftedTree1$1(OffsetCheckpoint.scala:49) > > at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:39) > > at > kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:234) > > at > kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:231) > > at > scala.collection.IndexedSeqOptimized$class.foreach(Indexe
Re: Kafka restart takes a long time
Actually see a bunch of errors. One of the brokers is out of space and this might be causing everything to spin out of control. Some logs: On *broker 1* (the one that has run out of space): 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13 ] [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-1-13], Disk error while replicating data. kafka.common.KafkaStorageException: I/O exception in append to log 'mytopic-633' at kafka.log.Log.append(Log.scala:283) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Caused by: java.io.IOException: No space left on device at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132) at kafka.log.FileMessageSet.append(FileMessageSet.scala:210) at kafka.log.LogSegment.append(LogSegment.scala:80) at kafka.log.Log.append(Log.scala:269) ... 13 more 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13 ] [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-2-13], Error getting offset for partition [myTopic,0] to broker 13 java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129) at java.io.BufferedWriter.write(BufferedWriter.java:230) at java.io.Writer.write(Writer.java:157) at kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:50) at kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:49) at scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106) at scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) at scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:106) at kafka.server.OffsetCheckpoint.liftedTree1$1(OffsetCheckpoint.scala:49) at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:39) at kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:234) at kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:231) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at kafka.log.LogManager.checkpointRecoveryPointOffsets(LogManager.scala:231) at kafka.log.LogManager.truncateTo(LogManager.scala:204) at kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:84) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:144) a
Re: Kafka restart takes a long time
Rajiv , which version of kafka are you using and do you see any errors when the server goes down after sending few messages. -Harsha On Sat, Nov 22, 2014, at 01:05 PM, Rajiv Kurian wrote: > The brokers also seem unavailable while this is going on. Each of these > log messages takes 2-3 seconds so at about 1200 partitions it takes up > quite a bit of time. Ultimately it does recover though but sadly it goes > down soon enough after I start sending it messages. > > On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian > wrote: > > > A 3 node kafka broker cluster went down yesterday (all nodes) and I just > > noticed it this morning. When I restarted it this morning, I see a lengthy > > list of messages like this: > > > > Loading log 'mytopic-partitionNum" > > Recovering unflushed segment 'some number' of in log mytopic-partitionNum. > > Completed load of log mytopic-partitionNum with log end offset someOffset > > > > It's been going on for more than 30 minutes since I restarted the broker. > > I have quite a few partitions (over 1000) but I still wouldn't expect it to > > take such a long time. > > > > Any ideas on how I should investigate the problem? > > > > Thanks! > >
Re: Kafka restart takes a long time
The brokers also seem unavailable while this is going on. Each of these log messages takes 2-3 seconds so at about 1200 partitions it takes up quite a bit of time. Ultimately it does recover though but sadly it goes down soon enough after I start sending it messages. On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian wrote: > A 3 node kafka broker cluster went down yesterday (all nodes) and I just > noticed it this morning. When I restarted it this morning, I see a lengthy > list of messages like this: > > Loading log 'mytopic-partitionNum" > Recovering unflushed segment 'some number' of in log mytopic-partitionNum. > Completed load of log mytopic-partitionNum with log end offset someOffset > > It's been going on for more than 30 minutes since I restarted the broker. > I have quite a few partitions (over 1000) but I still wouldn't expect it to > take such a long time. > > Any ideas on how I should investigate the problem? > > Thanks! >
Kafka restart takes a long time
A 3 node kafka broker cluster went down yesterday (all nodes) and I just noticed it this morning. When I restarted it this morning, I see a lengthy list of messages like this: Loading log 'mytopic-partitionNum" Recovering unflushed segment 'some number' of in log mytopic-partitionNum. Completed load of log mytopic-partitionNum with log end offset someOffset It's been going on for more than 30 minutes since I restarted the broker. I have quite a few partitions (over 1000) but I still wouldn't expect it to take such a long time. Any ideas on how I should investigate the problem? Thanks!