Repository: hbase Updated Branches: refs/heads/branch-1.3 53f1e2480 -> ca78cd500
HBASE-18910 Backport HBASE-17292 "Add observer notification before bulk loaded hfile is moved to region directory" to 1.3 Signed-off-by: tedyu <yuzhih...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca78cd50 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca78cd50 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca78cd50 Branch: refs/heads/branch-1.3 Commit: ca78cd500563e06149425f5931b67a8b6c12a849 Parents: 53f1e24 Author: Guangxu Cheng <guangxuch...@gmail.com> Authored: Sat Sep 30 11:59:33 2017 +0800 Committer: tedyu <yuzhih...@gmail.com> Committed: Sat Sep 30 06:48:44 2017 -0700 ---------------------------------------------------------------------- .../hbase/coprocessor/BaseRegionObserver.java | 10 +++ .../hbase/coprocessor/RegionObserver.java | 23 ++++++ .../hadoop/hbase/regionserver/HRegion.java | 83 ++++++++++++++------ .../hbase/regionserver/HRegionFileSystem.java | 24 ++++-- .../hadoop/hbase/regionserver/HStore.java | 16 +++- .../hbase/regionserver/RSRpcServices.java | 13 +-- .../regionserver/RegionCoprocessorHost.java | 21 +++++ .../apache/hadoop/hbase/regionserver/Store.java | 5 +- 8 files changed, 159 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 1bf7449..1c31169 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -483,6 +483,16 @@ public class BaseRegionObserver implements RegionObserver { } @Override + public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx, + final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException { + } + + @Override + public void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx, + final byte[] family, Path srcPath, Path dstPath) throws IOException { + } + + @Override public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException { return hasLoaded; http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 8c5c15a..0bea614 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -1187,6 +1187,29 @@ public interface RegionObserver extends Coprocessor { List<Pair<byte[], String>> familyPaths) throws IOException; /** + * Called before moving bulk loaded hfile to region directory. + * + * @param ctx + * @param family column family + * @param pairs List of pairs of { HFile location in staging dir, HFile path in region dir } + * Each pair are for the same hfile. + * @throws IOException + */ + void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx, + final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException; + + /** + * Called after moving bulk loaded hfile to region directory. + * + * @param ctx + * @param family column family + * @param srcPath Path to file before the move + * @param dstPath Path to file after the move + */ + void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx, + final byte[] family, Path srcPath, Path dstPath) throws IOException; + + /** * Called after bulkLoadHFile. * * @param ctx http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 906ea58..f1f20ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5592,37 +5592,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + Map<byte[], List<Pair<Path, Path>>> familyWithFinalPath = + new TreeMap<>(Bytes.BYTES_COMPARATOR); for (Pair<byte[], String> p : familyPaths) { byte[] familyName = p.getFirst(); String path = p.getSecond(); Store store = getStore(familyName); + if (!familyWithFinalPath.containsKey(familyName)) { + familyWithFinalPath.put(familyName, new ArrayList<Pair<Path, Path>>()); + } + List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName); try { String finalPath = path; if (bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId); - - // Note the size of the store file - try { - FileSystem fs = commitedStoreFile.getFileSystem(baseConf); - storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile) - .getLen()); - } catch (IOException e) { - LOG.warn("Failed to find the size of hfile " + commitedStoreFile); - storeFilesSizes.put(commitedStoreFile.getName(), 0L); - } - - if(storeFiles.containsKey(familyName)) { - storeFiles.get(familyName).add(commitedStoreFile); - } else { - List<Path> storeFileNames = new ArrayList<Path>(); - storeFileNames.add(commitedStoreFile); - storeFiles.put(familyName, storeFileNames); - } - if (bulkLoadListener != null) { - bulkLoadListener.doneBulkLoad(familyName, path); - } + Pair<Path, Path> pair = ((HStore)store).preBulkLoadHFile(finalPath, seqId); + lst.add(pair); } catch (IOException ioe) { // A failure here can cause an atomicity violation that we currently // cannot recover from since it is likely a failed HDFS operation. @@ -5642,6 +5628,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + if (this.getCoprocessorHost() != null) { + for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) { + this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue()); + } + } + for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) { + byte[] familyName = entry.getKey(); + for (Pair<Path, Path> p : entry.getValue()) { + String path = p.getFirst().toString(); + Path commitedStoreFile = p.getSecond(); + Store store = getStore(familyName); + try { + store.bulkLoadHFile(familyName, path, commitedStoreFile); + // Note the size of the store file + try { + FileSystem fs = commitedStoreFile.getFileSystem(baseConf); + storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile) + .getLen()); + } catch (IOException e) { + LOG.warn("Failed to find the size of hfile " + commitedStoreFile); + storeFilesSizes.put(commitedStoreFile.getName(), 0L); + } + + if(storeFiles.containsKey(familyName)) { + storeFiles.get(familyName).add(commitedStoreFile); + } else { + List<Path> storeFileNames = new ArrayList<Path>(); + storeFileNames.add(commitedStoreFile); + storeFiles.put(familyName, storeFileNames); + } + if (bulkLoadListener != null) { + bulkLoadListener.doneBulkLoad(familyName, path); + } + } catch (IOException ioe) { + // A failure here can cause an atomicity violation that we currently + // cannot recover from since it is likely a failed HDFS operation. + + // TODO Need a better story for reverting partial failures due to HDFS. + LOG.error("There was a partial failure due to IO when attempting to" + + " load " + Bytes.toString(familyName) + " : " + p.getSecond(), ioe); + if (bulkLoadListener != null) { + try { + bulkLoadListener.failedBulkLoad(familyName, path); + } catch (Exception ex) { + LOG.error("Error while calling failedBulkLoad for family " + + Bytes.toString(familyName) + " with path " + path, ex); + } + } + throw ioe; + } + } + } + isSuccessful = true; } finally { if (wal != null && !storeFiles.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 619358c..e9face1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; /** @@ -361,11 +362,13 @@ public class HRegionFileSystem { * @throws IOException */ public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException { - return commitStoreFile(familyName, buildPath, -1, false); + Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false); + return commitStoreFile(buildPath, dstPath); } /** - * Move the file from a build/temp location to the main family store directory. + * Generate the filename in the main family store directory for moving the file from a build/temp + * location. * @param familyName Family that will gain the file * @param buildPath {@link Path} to the file to commit. * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number) @@ -373,7 +376,7 @@ public class HRegionFileSystem { * @return The new {@link Path} of the committed file * @throws IOException */ - private Path commitStoreFile(final String familyName, final Path buildPath, + private Path preCommitStoreFile(final String familyName, final Path buildPath, final long seqNum, final boolean generateNewName) throws IOException { Path storeDir = getStoreDir(familyName); if(!fs.exists(storeDir) && !createDir(storeDir)) @@ -388,6 +391,17 @@ public class HRegionFileSystem { throw new FileNotFoundException(buildPath.toString()); } LOG.debug("Committing store file " + buildPath + " as " + dstPath); + return dstPath; + } + + /* + * Moves file from staging dir to region dir + * @param buildPath {@link Path} to the file to commit. + * @param dstPath {@link Path} to the file under region dir + * @return The {@link Path} of the committed file + * @throws IOException + */ + Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException { // buildPath exists, therefore not doing an exists() check. if (!rename(buildPath, dstPath)) { throw new IOException("Failed rename of " + buildPath + " to " + dstPath); @@ -445,7 +459,7 @@ public class HRegionFileSystem { * @return The destination {@link Path} of the bulk loaded file * @throws IOException */ - Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) + Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); @@ -463,7 +477,7 @@ public class HRegionFileSystem { srcPath = tmpPath; } - return commitStoreFile(familyName, srcPath, seqNum, true); + return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true)); } // =========================================================================== http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 957751c..409e309 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -796,10 +797,21 @@ public class HStore implements Store { } } + public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + Path srcPath = new Path(srcPathStr); + return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + } + @Override - public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { Path srcPath = new Path(srcPathStr); - Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + try { + fs.commitStoreFile(srcPath, dstPath); + } finally { + if (this.getCoprocessorHost() != null) { + this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); + } + } LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " + dstPath + " - updating store file list."); http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index cda48fc..1be168a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2074,11 +2074,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } boolean loaded = false; - if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); - } - if (region.getCoprocessorHost() != null) { - loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + try { + if (!bypass) { + loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); + } + } finally { + if (region.getCoprocessorHost() != null) { + loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + } } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(loaded); http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 1ed866a..d28bd8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1505,6 +1505,27 @@ public class RegionCoprocessorHost postWALRestore(info, (WALKey)logKey, logEdit); } + public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) + throws IOException { + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preCommitStoreFile(ctx, family, pairs); + } + }); + } + public void postCommitStoreFile(final byte[] family, final Path srcPath, final Path dstPath) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postCommitStoreFile(ctx, family, srcPath, dstPath); + } + }); + } + /** * @param familyPaths pairs of { CF, file path } submitted for bulk load * @return true if the default operation should be bypassed http://git-wip-us.apache.org/repos/asf/hbase/blob/ca78cd50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index e7a4de5..77fef1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -316,10 +316,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * This method should only be called from Region. It is assumed that the ranges of values in the * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) * + * @param family the column family * @param srcPathStr - * @param sequenceId sequence Id associated with the HFile + * @param dstPath */ - Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException; + Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException; // General accessors into the state of the store // TODO abstract some of this out into a metrics class