Author: jbellis
Date: Tue Sep  7 15:26:51 2010
New Revision: 993393

URL: http://svn.apache.org/viewvc?rev=993393&view=rev
Log:
make sure to close CommitLog files after replay.  backport of CASSANDRA-1348

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=993393&r1=993392&r2=993393&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Sep  7 15:26:51 2010
@@ -10,6 +10,7 @@
  * remove failed bootstrap attempt from pending ranges when gossip times
    it out after 1h (CASSANDRA-1463)
  * eager-create tcp connections to other cluster members (CASSANDRA-1465)
+ * make sure to close CommitLog files after replay (CASSANDRA-1348)
 
 
 0.6.5

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=993393&r1=993392&r2=993393&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
 Tue Sep  7 15:26:51 2010
@@ -185,95 +185,101 @@ public class CommitLog
             int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
             BufferedRandomAccessFile reader = new 
BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
 
-            final CommitLogHeader clHeader;
             try
             {
-                clHeader = CommitLogHeader.readCommitLogHeader(reader);
-            }
-            catch (EOFException eofe)
-            {
-                logger.info("Attempted to recover an incomplete 
CommitLogHeader.  Everything is ok, don't panic.");
-                continue;
-            }
-
-            /* seek to the lowest position where any CF has non-flushed data */
-            int lowPos = CommitLogHeader.getLowestPosition(clHeader);
-            if (lowPos == 0)
-                continue;
-
-            reader.seek(lowPos);
-            if (logger.isDebugEnabled())
-                logger.debug("Replaying " + file + " starting at " + lowPos);
-
-            /* read the logs populate RowMutation and apply */
-            while (!reader.isEOF())
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Reading mutation at " + 
reader.getFilePointer());
-
-                long claimedCRC32;
-                byte[] bytes;
+                final CommitLogHeader clHeader;
                 try
                 {
-                    bytes = new byte[(int) reader.readLong()]; // readlong can 
throw EOFException too
-                    reader.readFully(bytes);
-                    claimedCRC32 = reader.readLong();
+                    clHeader = CommitLogHeader.readCommitLogHeader(reader);
                 }
-                catch (EOFException e)
+                catch (EOFException eofe)
                 {
-                    // last CL entry didn't get completely written.  that's ok.
-                    break;
+                    logger.info("Attempted to recover an incomplete 
CommitLogHeader.  Everything is ok, don't panic.");
+                    continue;
                 }
 
-                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
-                Checksum checksum = new CRC32();
-                checksum.update(bytes, 0, bytes.length);
-                if (claimedCRC32 != checksum.getValue())
-                {
-                    // this part of the log must not have been fsynced.  
probably the rest is bad too,
-                    // but just in case there is no harm in trying them.
+                /* seek to the lowest position where any CF has non-flushed 
data */
+                int lowPos = CommitLogHeader.getLowestPosition(clHeader);
+                if (lowPos == 0)
                     continue;
-                }
 
-                /* deserialize the commit log entry */
-                final RowMutation rm = 
RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+                reader.seek(lowPos);
                 if (logger.isDebugEnabled())
-                    logger.debug(String.format("replaying mutation for %s.%s: 
%s",
-                                                rm.getTable(),
-                                                rm.key(),
-                                                "{" + 
StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
-                final Table table = Table.open(rm.getTable());
-                tablesRecovered.add(table);
-                final Collection<ColumnFamily> columnFamilies = new 
ArrayList<ColumnFamily>(rm.getColumnFamilies());
-                final long entryLocation = reader.getFilePointer();
-                Runnable runnable = new WrappedRunnable()
+                    logger.debug("Replaying " + file + " starting at " + 
lowPos);
+
+                /* read the logs populate RowMutation and apply */
+                while (!reader.isEOF())
                 {
-                    public void runMayThrow() throws IOException
+                    if (logger.isDebugEnabled())
+                        logger.debug("Reading mutation at " + 
reader.getFilePointer());
+
+                    long claimedCRC32;
+                    byte[] bytes;
+                    try
+                    {
+                        bytes = new byte[(int) reader.readLong()]; // readlong 
can throw EOFException too
+                        reader.readFully(bytes);
+                        claimedCRC32 = reader.readLong();
+                    }
+                    catch (EOFException e)
                     {
-                        /* remove column families that have already been 
flushed before applying the rest */
-                        for (ColumnFamily columnFamily : columnFamilies)
+                        // last CL entry didn't get completely written.  
that's ok.
+                        break;
+                    }
+
+                    ByteArrayInputStream bufIn = new 
ByteArrayInputStream(bytes);
+                    Checksum checksum = new CRC32();
+                    checksum.update(bytes, 0, bytes.length);
+                    if (claimedCRC32 != checksum.getValue())
+                    {
+                        // this part of the log must not have been fsynced.  
probably the rest is bad too,
+                        // but just in case there is no harm in trying them.
+                        continue;
+                    }
+
+                    /* deserialize the commit log entry */
+                    final RowMutation rm = 
RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+                    if (logger.isDebugEnabled())
+                        logger.debug(String.format("replaying mutation for 
%s.%s: %s",
+                                                    rm.getTable(),
+                                                    rm.key(),
+                                                    "{" + 
StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
+                    final Table table = Table.open(rm.getTable());
+                    tablesRecovered.add(table);
+                    final Collection<ColumnFamily> columnFamilies = new 
ArrayList<ColumnFamily>(rm.getColumnFamilies());
+                    final long entryLocation = reader.getFilePointer();
+                    Runnable runnable = new WrappedRunnable()
+                    {
+                        public void runMayThrow() throws IOException
                         {
-                            int id = 
table.getColumnFamilyId(columnFamily.name());
-                            if (!clHeader.isDirty(id) || entryLocation < 
clHeader.getPosition(id))
+                            /* remove column families that have already been 
flushed before applying the rest */
+                            for (ColumnFamily columnFamily : columnFamilies)
                             {
-                                rm.removeColumnFamily(columnFamily);
+                                int id = 
table.getColumnFamilyId(columnFamily.name());
+                                if (!clHeader.isDirty(id) || entryLocation < 
clHeader.getPosition(id))
+                                {
+                                    rm.removeColumnFamily(columnFamily);
+                                }
+                            }
+                            if (!rm.isEmpty())
+                            {
+                                Table.open(rm.getTable()).apply(rm, null, 
false);
                             }
                         }
-                        if (!rm.isEmpty())
-                        {
-                            Table.open(rm.getTable()).apply(rm, null, false);
-                        }
+                    };
+                    
futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+                    if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+                    {
+                        FBUtilities.waitOnFutures(futures);
+                        futures.clear();
                     }
-                };
-                
futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
-                if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
-                {
-                    FBUtilities.waitOnFutures(futures);
-                    futures.clear();
                 }
             }
-            reader.close();
-            logger.info("Finished reading " + file);
+            finally
+            {
+                reader.close();
+                logger.info("Finished reading " + file);
+            }
         }
 
         // wait for all the writes to finish on the mutation stage


Reply via email to