Author: stack
Date: Wed Dec 21 04:00:47 2011
New Revision: 1221596
URL: http://svn.apache.org/viewvc?rev=1221596&view=rev
Log:
HBASE-5078 DistributedLogSplitter failing to split file because it has edits
for lots of regions
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1221596&r1=1221595&r2=1221596&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
Wed Dec 21 04:00:47 2011
@@ -264,7 +264,7 @@ public class HLogSplitter {
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
- long totalBytesToSplit = countTotalBytes(logfiles);
+ countTotalBytes(logfiles);
splitSize = 0;
outputSink.startWriterThreads(entryBuffers);
@@ -369,10 +369,12 @@ public class HLogSplitter {
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
HLog.SPLIT_SKIP_ERRORS_DEFAULT);
int interval = conf.getInt("hbase.splitlog.report.interval.loglines",
1024);
- // How often to send a progress report (default 1/2 master timeout)
+ // How often to send a progress report (default 1/2 the zookeeper session
+ // timeout of if that not set, the split log DEFAULT_TIMEOUT)
int period = conf.getInt("hbase.splitlog.report.period",
- conf.getInt("hbase.splitlog.manager.timeout",
- ZKSplitLog.DEFAULT_TIMEOUT) / 2);
+ conf.getInt("hbase.splitlog.manager.timeout",
ZKSplitLog.DEFAULT_TIMEOUT) / 2);
+ int numOpenedFilesBeforeReporting =
+ conf.getInt("hbase.splitlog.report.openedfiles", 3);
Path logPath = logfile.getPath();
long logLength = logfile.getLen();
LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
@@ -396,7 +398,10 @@ public class HLogSplitter {
status.markComplete("Failed: reporter.progress asked us to terminate");
return false;
}
+ // Report progress every so many edits and/or files opened (opening a file
+ // takes a bit of time).
int editsCount = 0;
+ int numNewlyOpenedFiles = 0;
Entry entry;
try {
while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
@@ -408,9 +413,10 @@ public class HLogSplitter {
WriterAndPath wap = (WriterAndPath)o;
if (wap == null) {
wap = createWAP(region, entry, rootDir, tmpname, fs, conf);
+ numNewlyOpenedFiles++;
if (wap == null) {
- // ignore edits from this region. It doesn't ezist anymore.
- // It was probably already split.
+ // ignore edits from this region. It doesn't exist anymore.
+ // It was probably already split.
logWriters.put(region, BAD_WRITER);
continue;
} else {
@@ -419,13 +425,19 @@ public class HLogSplitter {
}
wap.w.append(entry);
editsCount++;
- if (editsCount % interval == 0) {
- status.setStatus("Split " + editsCount + " edits");
+ // If sufficient edits have passed OR we've opened a few files, check
if
+ // we should report progress.
+ if (editsCount % interval == 0 ||
+ (numNewlyOpenedFiles > numOpenedFilesBeforeReporting)) {
+ // Zero out files counter each time we fall in here.
+ numNewlyOpenedFiles = 0;
+ String countsStr = "edits=" + editsCount + ", files=" +
logWriters.size();
+ status.setStatus("Split " + countsStr);
long t1 = EnvironmentEdgeManager.currentTimeMillis();
if ((t1 - last_report_at) > period) {
last_report_at = t;
if (reporter != null && reporter.progress() == false) {
- status.markComplete("Failed: reporter.progress asked us to
terminate");
+ status.markComplete("Failed: reporter.progress asked us to
terminate; " + countsStr);
progress_failed = true;
return false;
}
@@ -476,10 +488,10 @@ public class HLogSplitter {
}
}
}
- String msg = ("processed " + editsCount + " edits across " + n + "
regions" +
- " threw away edits for " + (logWriters.size() - n) + " regions" +
- " log file = " + logPath +
- " is corrupted = " + isCorrupted);
+ String msg = "Processed " + editsCount + " edits across " + n + "
regions" +
+ " threw away edits for " + (logWriters.size() - n) + " regions" +
+ "; log file=" + logPath +
+ ", corrupted=" + isCorrupted;
LOG.info(msg);
status.markComplete(msg);
}