Author: jbellis
Date: Tue Sep 7 15:26:51 2010
New Revision: 993393
URL: http://svn.apache.org/viewvc?rev=993393&view=rev
Log:
make sure to close CommitLog files after replay. backport of CASSANDRA-1348
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=993393&r1=993392&r2=993393&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Sep 7 15:26:51 2010
@@ -10,6 +10,7 @@
* remove failed bootstrap attempt from pending ranges when gossip times
it out after 1h (CASSANDRA-1463)
* eager-create tcp connections to other cluster members (CASSANDRA-1465)
+ * make sure to close CommitLog files after replay (CASSANDRA-1348)
0.6.5
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=993393&r1=993392&r2=993393&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Tue Sep 7 15:26:51 2010
@@ -185,95 +185,101 @@ public class CommitLog
int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
BufferedRandomAccessFile reader = new
BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
- final CommitLogHeader clHeader;
try
{
- clHeader = CommitLogHeader.readCommitLogHeader(reader);
- }
- catch (EOFException eofe)
- {
- logger.info("Attempted to recover an incomplete
CommitLogHeader. Everything is ok, don't panic.");
- continue;
- }
-
- /* seek to the lowest position where any CF has non-flushed data */
- int lowPos = CommitLogHeader.getLowestPosition(clHeader);
- if (lowPos == 0)
- continue;
-
- reader.seek(lowPos);
- if (logger.isDebugEnabled())
- logger.debug("Replaying " + file + " starting at " + lowPos);
-
- /* read the logs populate RowMutation and apply */
- while (!reader.isEOF())
- {
- if (logger.isDebugEnabled())
- logger.debug("Reading mutation at " +
reader.getFilePointer());
-
- long claimedCRC32;
- byte[] bytes;
+ final CommitLogHeader clHeader;
try
{
- bytes = new byte[(int) reader.readLong()]; // readlong can
throw EOFException too
- reader.readFully(bytes);
- claimedCRC32 = reader.readLong();
+ clHeader = CommitLogHeader.readCommitLogHeader(reader);
}
- catch (EOFException e)
+ catch (EOFException eofe)
{
- // last CL entry didn't get completely written. that's ok.
- break;
+ logger.info("Attempted to recover an incomplete
CommitLogHeader. Everything is ok, don't panic.");
+ continue;
}
- ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
- Checksum checksum = new CRC32();
- checksum.update(bytes, 0, bytes.length);
- if (claimedCRC32 != checksum.getValue())
- {
- // this part of the log must not have been fsynced.
probably the rest is bad too,
- // but just in case there is no harm in trying them.
+ /* seek to the lowest position where any CF has non-flushed
data */
+ int lowPos = CommitLogHeader.getLowestPosition(clHeader);
+ if (lowPos == 0)
continue;
- }
- /* deserialize the commit log entry */
- final RowMutation rm =
RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+ reader.seek(lowPos);
if (logger.isDebugEnabled())
- logger.debug(String.format("replaying mutation for %s.%s:
%s",
- rm.getTable(),
- rm.key(),
- "{" +
StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
- final Table table = Table.open(rm.getTable());
- tablesRecovered.add(table);
- final Collection<ColumnFamily> columnFamilies = new
ArrayList<ColumnFamily>(rm.getColumnFamilies());
- final long entryLocation = reader.getFilePointer();
- Runnable runnable = new WrappedRunnable()
+ logger.debug("Replaying " + file + " starting at " +
lowPos);
+
+ /* read the logs populate RowMutation and apply */
+ while (!reader.isEOF())
{
- public void runMayThrow() throws IOException
+ if (logger.isDebugEnabled())
+ logger.debug("Reading mutation at " +
reader.getFilePointer());
+
+ long claimedCRC32;
+ byte[] bytes;
+ try
+ {
+ bytes = new byte[(int) reader.readLong()]; // readlong
can throw EOFException too
+ reader.readFully(bytes);
+ claimedCRC32 = reader.readLong();
+ }
+ catch (EOFException e)
{
- /* remove column families that have already been
flushed before applying the rest */
- for (ColumnFamily columnFamily : columnFamilies)
+ // last CL entry didn't get completely written.
that's ok.
+ break;
+ }
+
+ ByteArrayInputStream bufIn = new
ByteArrayInputStream(bytes);
+ Checksum checksum = new CRC32();
+ checksum.update(bytes, 0, bytes.length);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this part of the log must not have been fsynced.
probably the rest is bad too,
+ // but just in case there is no harm in trying them.
+ continue;
+ }
+
+ /* deserialize the commit log entry */
+ final RowMutation rm =
RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("replaying mutation for
%s.%s: %s",
+ rm.getTable(),
+ rm.key(),
+ "{" +
StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
+ final Table table = Table.open(rm.getTable());
+ tablesRecovered.add(table);
+ final Collection<ColumnFamily> columnFamilies = new
ArrayList<ColumnFamily>(rm.getColumnFamilies());
+ final long entryLocation = reader.getFilePointer();
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
{
- int id =
table.getColumnFamilyId(columnFamily.name());
- if (!clHeader.isDirty(id) || entryLocation <
clHeader.getPosition(id))
+ /* remove column families that have already been
flushed before applying the rest */
+ for (ColumnFamily columnFamily : columnFamilies)
{
- rm.removeColumnFamily(columnFamily);
+ int id =
table.getColumnFamilyId(columnFamily.name());
+ if (!clHeader.isDirty(id) || entryLocation <
clHeader.getPosition(id))
+ {
+ rm.removeColumnFamily(columnFamily);
+ }
+ }
+ if (!rm.isEmpty())
+ {
+ Table.open(rm.getTable()).apply(rm, null,
false);
}
}
- if (!rm.isEmpty())
- {
- Table.open(rm.getTable()).apply(rm, null, false);
- }
+ };
+
futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
}
- };
-
futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
}
}
- reader.close();
- logger.info("Finished reading " + file);
+ finally
+ {
+ reader.close();
+ logger.info("Finished reading " + file);
+ }
}
// wait for all the writes to finish on the mutation stage