Author: jbellis Date: Tue Aug 3 15:47:29 2010 New Revision: 981938 URL: http://svn.apache.org/viewvc?rev=981938&view=rev Log: close commitlog reader before deleting it patch by jbellis; reviewed by gdusbabek for CASSANDRA-1348
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=981938&r1=981937&r2=981938&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Tue Aug 3 15:47:29 2010 @@ -187,115 +187,122 @@ public class CommitLog for (File file : clogs) { - CommitLogHeader clHeader = null; - int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024); - BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize); - - int replayPosition = 0; - String headerPath = CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath()); + BufferedRandomAccessFile reader = null; try { - clHeader = CommitLogHeader.readCommitLogHeader(headerPath); - replayPosition = clHeader.getReplayPosition(); - } - catch (IOException ioe) - { - logger.info(headerPath + " incomplete, missing or corrupt. Everything is ok, don't panic. CommitLog will be replayed from the beginning"); - logger.debug("exception was", ioe); - } - if (replayPosition < 0) - { - logger.debug("skipping replay of fully-flushed {}", file); - continue; - } - reader.seek(replayPosition); + CommitLogHeader clHeader = null; + int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024); + reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize); - if (logger.isDebugEnabled()) - logger.debug("Replaying " + file + " starting at " + reader.getFilePointer()); - - /* read the logs populate RowMutation and apply */ - while (!reader.isEOF()) - { - if (logger.isDebugEnabled()) - logger.debug("Reading mutation at " + reader.getFilePointer()); - - long claimedCRC32; - - Checksum checksum = new CRC32(); - int serializedSize; + int replayPosition = 0; + String headerPath = CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath()); try { - // any of the reads may hit EOF - serializedSize = reader.readInt(); - long claimedSizeChecksum = reader.readLong(); - checksum.update(serializedSize); - if (checksum.getValue() != claimedSizeChecksum || serializedSize <= 0) - break; // entry wasn't synced correctly/fully. that's ok. - - if (serializedSize > bytes.length) - bytes = new byte[(int) (1.2 * serializedSize)]; - reader.readFully(bytes, 0, serializedSize); - claimedCRC32 = reader.readLong(); + clHeader = CommitLogHeader.readCommitLogHeader(headerPath); + replayPosition = clHeader.getReplayPosition(); } - catch(EOFException eof) + catch (IOException ioe) { - break; // last CL entry didn't get completely written. that's ok. + logger.info(headerPath + " incomplete, missing or corrupt. Everything is ok, don't panic. CommitLog will be replayed from the beginning"); + logger.debug("exception was", ioe); } - - checksum.update(bytes, 0, serializedSize); - if (claimedCRC32 != checksum.getValue()) + if (replayPosition < 0) { - // this entry must not have been fsynced. probably the rest is bad too, - // but just in case there is no harm in trying them (since we still read on an entry boundary) + logger.debug("skipping replay of fully-flushed {}", file); continue; } + reader.seek(replayPosition); - /* deserialize the commit log entry */ - ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize); - final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn)); if (logger.isDebugEnabled()) - logger.debug(String.format("replaying mutation for %s.%s: %s", - rm.getTable(), - rm.key(), - "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}")); - final Table table = Table.open(rm.getTable()); - tablesRecovered.add(table); - final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies()); - final long entryLocation = reader.getFilePointer(); - final CommitLogHeader finalHeader = clHeader; - Runnable runnable = new WrappedRunnable() + logger.debug("Replaying " + file + " starting at " + reader.getFilePointer()); + + /* read the logs populate RowMutation and apply */ + while (!reader.isEOF()) { - public void runMayThrow() throws IOException + if (logger.isDebugEnabled()) + logger.debug("Reading mutation at " + reader.getFilePointer()); + + long claimedCRC32; + + Checksum checksum = new CRC32(); + int serializedSize; + try { - RowMutation newRm = new RowMutation(rm.getTable(), rm.key()); - - // Rebuild the row mutation, omitting column families that a) have already been flushed, - // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every - // thing based on the cfid instead. - for (ColumnFamily columnFamily : columnFamilies) - { - if (CFMetaData.getCF(columnFamily.id()) == null) - // null means the cf has been dropped - continue; - - if (finalHeader == null || (finalHeader.isDirty(columnFamily.id()) && entryLocation >= finalHeader.getPosition(columnFamily.id()))) - newRm.add(columnFamily); - } - if (!newRm.isEmpty()) + // any of the reads may hit EOF + serializedSize = reader.readInt(); + long claimedSizeChecksum = reader.readLong(); + checksum.update(serializedSize); + if (checksum.getValue() != claimedSizeChecksum || serializedSize <= 0) + break; // entry wasn't synced correctly/fully. that's ok. + + if (serializedSize > bytes.length) + bytes = new byte[(int) (1.2 * serializedSize)]; + reader.readFully(bytes, 0, serializedSize); + claimedCRC32 = reader.readLong(); + } + catch(EOFException eof) + { + break; // last CL entry didn't get completely written. that's ok. + } + + checksum.update(bytes, 0, serializedSize); + if (claimedCRC32 != checksum.getValue()) + { + // this entry must not have been fsynced. probably the rest is bad too, + // but just in case there is no harm in trying them (since we still read on an entry boundary) + continue; + } + + /* deserialize the commit log entry */ + ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize); + final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn)); + if (logger.isDebugEnabled()) + logger.debug(String.format("replaying mutation for %s.%s: %s", + rm.getTable(), + rm.key(), + "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}")); + final Table table = Table.open(rm.getTable()); + tablesRecovered.add(table); + final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies()); + final long entryLocation = reader.getFilePointer(); + final CommitLogHeader finalHeader = clHeader; + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws IOException { - Table.open(newRm.getTable()).apply(newRm, null, false); + RowMutation newRm = new RowMutation(rm.getTable(), rm.key()); + + // Rebuild the row mutation, omitting column families that a) have already been flushed, + // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every + // thing based on the cfid instead. + for (ColumnFamily columnFamily : columnFamilies) + { + if (CFMetaData.getCF(columnFamily.id()) == null) + // null means the cf has been dropped + continue; + + if (finalHeader == null || (finalHeader.isDirty(columnFamily.id()) && entryLocation >= finalHeader.getPosition(columnFamily.id()))) + newRm.add(columnFamily); + } + if (!newRm.isEmpty()) + { + Table.open(newRm.getTable()).apply(newRm, null, false); + } } + }; + futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable)); + if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) + { + FBUtilities.waitOnFutures(futures); + futures.clear(); } - }; - futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable)); - if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) - { - FBUtilities.waitOnFutures(futures); - futures.clear(); } } - reader.close(); - logger.info("Finished reading " + file); + finally + { + reader.close(); + logger.info("Finished reading " + file); + } } // wait for all the writes to finish on the mutation stage