This is an automated email from the ASF dual-hosted git repository. aasha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 6aa812a HIVE-25330: Make FS calls in CopyUtils retryable (#2516)(Haymant Mangla, reviewed by Ayush Saxena) 6aa812a is described below commit 6aa812a88163a204f63cb84f5f581a533d2f54a0 Author: Haymant Mangla <79496857+hmangl...@users.noreply.github.com> AuthorDate: Thu Sep 23 20:28:32 2021 +0530 HIVE-25330: Make FS calls in CopyUtils retryable (#2516)(Haymant Mangla, reviewed by Ayush Saxena) * HIVE-25330: Make FS calls in CopyUtils retryable * NPE Corrected * Retries fail on parent exceptions. * Changed to HashSet --- .../apache/hadoop/hive/ql/exec/util/Retryable.java | 45 ++++-- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 159 +++++++++++++-------- .../hadoop/hive/ql/parse/repl/TestCopyUtils.java | 100 ++++++++++++- 3 files changed, 227 insertions(+), 77 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java index a31b96b..01b486d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java @@ -23,7 +23,8 @@ import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.security.UserGroupInformation; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; @@ -36,16 +37,18 @@ public class Retryable { private static final long MINIMUM_DELAY_IN_SEC = 60; private long totalDurationInSeconds; - private List<Class<? extends Exception>> retryOn; - private List<Class<? extends Exception>> failOn; + private Set<Class<? extends Exception>> retryOn; + private Set<Class<? extends Exception>> failOn; + private Set<Class<? extends Exception>> failOnParentExceptions; private long initialDelayInSeconds; private long maxRetryDelayInSeconds; private double backOff; private int maxJitterInSeconds; private Retryable() { - this.retryOn = new ArrayList<>(); - this.failOn = new ArrayList<>(); + this.retryOn = new HashSet<>(); + this.failOn = new HashSet<>(); + this.failOnParentExceptions = new HashSet<>(); this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal, HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS); this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal, @@ -74,7 +77,8 @@ public class Retryable { return callable.call(); } } catch (Exception e) { - if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k)) + if (this.failOnParentExceptions.stream().noneMatch(k -> k.isAssignableFrom(e.getClass())) + && this.failOn.stream().noneMatch(k -> e.getClass().equals(k)) && this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) { if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) { // case where waiting would go beyond max duration. So throw exception and return @@ -149,8 +153,7 @@ public class Retryable { // making this thread safe as it appends to list public synchronized Builder withRetryOnException(final Class<? extends Exception> exceptionClass) { - if (exceptionClass != null && - runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + if (exceptionClass != null) { runnable.retryOn.add(exceptionClass); } return this; @@ -158,17 +161,32 @@ public class Retryable { public synchronized Builder withRetryOnExceptionList(final List<Class<? extends Exception>> exceptionClassList) { for (final Class<? extends Exception> exceptionClass : exceptionClassList) { - if (exceptionClass != null && - runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + if (exceptionClass != null) { runnable.retryOn.add(exceptionClass); } } return this; } + public synchronized Builder withFailOnParentException(final Class<? extends Exception> exceptionClass) { + if (exceptionClass != null) { + runnable.failOnParentExceptions.add(exceptionClass); + } + return this; + } + + public synchronized Builder withFailOnParentExceptionList(final List<Class<? + extends Exception>> exceptionClassList) { + for (final Class<? extends Exception> exceptionClass : exceptionClassList) { + if (exceptionClass != null) { + runnable.failOnParentExceptions.add(exceptionClass); + } + } + return this; + } + public synchronized Builder withFailOnException(final Class<? extends Exception> exceptionClass) { - if (exceptionClass != null && - runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + if (exceptionClass != null) { runnable.failOn.add(exceptionClass); } return this; @@ -177,8 +195,7 @@ public class Retryable { public synchronized Builder withFailOnExceptionList(final List<Class<? extends Exception>> exceptionClassList) { for (final Class<? extends Exception> exceptionClass : exceptionClassList) { - if (exceptionClass != null && - runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + if (exceptionClass != null) { runnable.failOn.add(exceptionClass); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index db68250..2f9c071 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -66,6 +67,16 @@ public class CopyUtils { private FileSystem destinationFs; private final int maxParallelCopyTask; + private List<Class<? extends Exception>> failOnParentExceptionList = Arrays.asList(org.apache.hadoop.fs.PathIOException.class, + org.apache.hadoop.fs.UnsupportedFileSystemException.class, + org.apache.hadoop.fs.InvalidPathException.class, + org.apache.hadoop.fs.InvalidRequestException.class, + org.apache.hadoop.fs.FileAlreadyExistsException.class, + org.apache.hadoop.fs.ChecksumException.class, + org.apache.hadoop.fs.ParentNotDirectoryException.class, + org.apache.hadoop.hdfs.protocol.QuotaExceededException.class, + FileNotFoundException.class); + public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinationFs) { this.hiveConf = hiveConf; maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); @@ -76,6 +87,57 @@ public class CopyUtils { this.destinationFs = destinationFs; } + private <T> T retryableFxn(Callable<T> callable) throws IOException { + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(IOException.class).withFailOnParentExceptionList(failOnParentExceptionList).build(); + try { + return retryable.executeCallable(() -> callable.call()); + } catch (Exception e) { + if (failOnParentExceptionList.stream().anyMatch(k -> k.isAssignableFrom(e.getClass()))) { + throw new IOException(e); + } + throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg(), e); + } + } + + @VisibleForTesting + String checkSumFor(Path srcFile, FileSystem fs) throws IOException { + return retryableFxn(() -> ReplChangeManager.checksumFor(srcFile, fs)); + } + + @VisibleForTesting + void copyFilesBetweenFS(FileSystem sourceFs, Path[] paths, FileSystem destinationFs, + Path finalDestination, boolean deleteSource, boolean overwrite) throws IOException { + retryableFxn(() -> FileUtil + .copy(sourceFs, paths, destinationFs, finalDestination, deleteSource, overwrite, hiveConf)); + } + + @VisibleForTesting + boolean exists(FileSystem fs, Path path) throws IOException { + return retryableFxn(() -> fs.exists(path)); + } + + @VisibleForTesting + boolean delete(FileSystem fs, Path path, boolean recursive) throws IOException { + return retryableFxn(() -> fs.delete(path, recursive)); + } + + @VisibleForTesting + boolean mkdirs(FileSystem fs, Path path) throws IOException { + return retryableFxn(() -> fs.mkdirs(path)); + } + + @VisibleForTesting + boolean rename(FileSystem fs, Path srcPath, Path dstPath) throws IOException { + return retryableFxn(() -> fs.rename(srcPath, dstPath)); + } + + @VisibleForTesting + ContentSummary getContentSummary(FileSystem fs, Path f) throws IOException { + return retryableFxn(() -> fs.getContentSummary(f)); + } + // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm @@ -131,9 +193,6 @@ public class CopyUtils { if (executorService != null) { executorService.shutdown(); } - if (proxyUser != null) { - FileSystem.closeAllForUGI(proxyUser); - } } } @@ -144,15 +203,13 @@ public class CopyUtils { @VisibleForTesting void doCopy(Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry, UserGroupInformation proxyUser, - boolean useRegularCopy, boolean overwrite) throws IOException, LoginException, - HiveFatalException { + boolean useRegularCopy, boolean overwrite) throws IOException, LoginException, HiveFatalException { Path destination = destMapEntry.getKey(); List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue(); // Get the file system again from cache. There is a chance that the file system stored in the map is closed. // For instance, doCopyRetry closes the file system in case of i/o exceptions. FileSystem sourceFsOfFileInfo = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); - if (!destinationFs.exists(destination) - && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { + if (!exists(destinationFs, destination) && !mkdirs(destinationFs, destination)) { LOG.error("Failed to create destination directory: " + destination); throw new IOException("Destination directory creation failed"); } @@ -190,11 +247,14 @@ public class CopyUtils { // If copy fails, fall through the retry logic LOG.info("file operation failed", e); - if (repeat >= (MAX_IO_RETRY - 1)) { - //no need to wait in the last iteration + //Don't retry in the following cases: + //1. This is last attempt of retry. + //2. Execution already hit the exception which should not be retried. + //3. Retry is already exhausted by FS operations. + if (repeat >= (MAX_IO_RETRY - 1) || failOnParentExceptionList.stream().anyMatch(k -> k.isAssignableFrom(e.getClass())) + || ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg().equals(e.getMessage())) { break; } - if (!(e instanceof FileNotFoundException)) { int sleepTime = FileUtils.getSleepTime(repeat); LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (repeat+1)); @@ -205,11 +265,6 @@ public class CopyUtils { } // looks like some network outrage, reset the file system object and retry. - if (proxyUser == null) { - FileSystem.closeAllForUGI(Utils.getUGI()); - } else { - FileSystem.closeAllForUGI(proxyUser); - } sourceFs = pathList.get(0).getFileSystem(hiveConf); destinationFs = destination.getFileSystem(hiveConf); } @@ -240,11 +295,11 @@ public class CopyUtils { } Path srcPath = srcFile.getEffectivePath(); //Path destPath = new Path(destination, srcPath.getName()); - if (destinationFs.exists(destination)) { + if (exists(destinationFs, destination)) { // If destination file is present and checksum of source mismatch, then retry copy. if (isSourceFileMismatch(sourceFs, srcFile)) { // Delete the incorrectly copied file and retry with CM path - destinationFs.delete(destination, true); + delete(destinationFs, destination, true); srcFile.setIsUseSourcePath(false); } else { // If the retry logic is reached after copy error, then include the copied file as well. @@ -270,7 +325,7 @@ public class CopyUtils { throw new HiveFatalException(ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getMsg()); } - if (!srcFile.isUseSourcePath() && !sourceFs.exists(srcFile.getCmPath())) { + if (!srcFile.isUseSourcePath() && !exists(sourceFs, srcFile.getCmPath())) { // CM path itself is missing, cannot recover from this error LOG.error("File Copy Failed. Both source and CM files are missing from source. " + "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". " @@ -297,7 +352,7 @@ public class CopyUtils { String destFileName = srcFile.getCmPath().getName(); Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath); Path destFile = new Path(destRoot, destFileName); - if (dstFs.exists(destFile)) { + if (exists(dstFs, destFile)) { String destFileWithSourceName = srcFile.getSourcePath().getName(); Path newDestFile = new Path(destRoot, destFileWithSourceName); @@ -305,13 +360,13 @@ public class CopyUtils { // directly to table path (bypassing staging directory) then there might be some stale files from previous // incomplete/failed load. No need of recycle as this is a case of stale file. try { - dstFs.delete(newDestFile, true); + delete(dstFs, newDestFile, true); LOG.debug(" file " + newDestFile + " is deleted before renaming"); } catch (FileNotFoundException e) { // no problem } - boolean result = dstFs.rename(destFile, newDestFile); + boolean result = rename(dstFs, destFile, newDestFile); if (!result) { throw new IllegalStateException( "could not rename " + destFile.getName() + " to " + newDestFile.getName()); @@ -328,12 +383,11 @@ public class CopyUtils { if (sourceChecksumString != null) { String verifySourceChecksumString; try { - verifySourceChecksumString - = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs); + verifySourceChecksumString = checkSumFor(srcFile.getSourcePath(), sourceFs); } catch (IOException e) { LOG.info("Unable to calculate checksum for source file: " + srcFile.getSourcePath(), e); - if (!sourceFs.exists(srcFile.getSourcePath())) { + if (!exists(sourceFs, srcFile.getSourcePath())) { // if source file is missing, then return true, so that cm path will be used for copy. return true; } @@ -351,23 +405,13 @@ public class CopyUtils { if (copyAsUser == null) { return null; } - Retryable retryable = Retryable.builder() - .withHiveConf(hiveConf) - .withRetryOnException(IOException.class).build(); - try { - return retryable.executeCallable(() -> { - UserGroupInformation proxyUser = null; - UserGroupInformation ugi = Utils.getUGI(); - String currentUser = ugi.getShortUserName(); - if (!currentUser.equals(copyAsUser)) { - proxyUser = UserGroupInformation.createProxyUser( - copyAsUser, UserGroupInformation.getLoginUser()); - } - return proxyUser; - }); - } catch (Exception e) { - throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); - } + return retryableFxn(() -> { + String currentUser = Utils.getUGI().getShortUserName(); + if (!currentUser.equals(copyAsUser)) { + return UserGroupInformation.createProxyUser(copyAsUser, UserGroupInformation.getLoginUser()); + } + return null; + }); } // Copy without retry @@ -423,8 +467,7 @@ public class CopyUtils { if (overWrite) { deleteSubDirs(destinationFs, destination); } - FileUtil - .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf); + copyFilesBetweenFS(sourceFs, paths, destinationFs, finalDestination, false, true); return true; }); } catch (InterruptedException e) { @@ -435,35 +478,29 @@ public class CopyUtils { if (overWrite) { deleteSubDirs(destinationFs, destination); } - FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); + copyFilesBetweenFS(sourceFs, paths, destinationFs, destination, false, true); } } private void deleteSubDirs(FileSystem fs, Path path) throws IOException { //Delete the root path instead of doing a listing //This is more optimised - fs.delete(path, true); + delete(fs, path, true); //Recreate just the Root folder - fs.mkdirs(path); + mkdirs(fs, path); } public void doCopy(Path destination, List<Path> srcPaths) throws IOException, LoginException { Map<FileSystem, List<Path>> map = fsToPathMap(srcPaths); UserGroupInformation proxyUser = getProxyUser(); - try { - for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) { - final FileSystem sourceFs = entry.getKey(); - List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(), - path -> new ReplChangeManager.FileInfo(sourceFs, path, null)); - doCopyOnce(sourceFs, entry.getValue(), - destination, - regularCopy(sourceFs, fileList), proxyUser, false); - } - } finally { - if (proxyUser != null) { - FileSystem.closeAllForUGI(proxyUser); - } + for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) { + final FileSystem sourceFs = entry.getKey(); + List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(), + path -> new ReplChangeManager.FileInfo(sourceFs, path, null)); + doCopyOnce(sourceFs, entry.getValue(), + destination, + regularCopy(sourceFs, fileList), proxyUser, false); } } @@ -492,11 +529,11 @@ public class CopyUtils { for (ReplChangeManager.FileInfo fileInfo : fileList) { ContentSummary contentSummary = null; try { - contentSummary = sourceFs.getContentSummary(fileInfo.getEffectivePath()); + contentSummary = getContentSummary(sourceFs, fileInfo.getEffectivePath()); } catch (IOException e) { // In replication, if source file does not exist, try cmroot if (fileInfo.isUseSourcePath() && fileInfo.getCmPath() != null) { - contentSummary = sourceFs.getContentSummary(fileInfo.getCmPath()); + contentSummary = getContentSummary(sourceFs, fileInfo.getCmPath()); fileInfo.setIsUseSourcePath(false); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java index 94993bb..4740802 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; @@ -43,6 +44,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyListOf; import static org.mockito.ArgumentMatchers.eq; @@ -57,7 +59,7 @@ import static org.powermock.api.mockito.PowerMockito.when; * Unit Test class for CopyUtils class. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ CopyUtils.class, FileUtils.class, Utils.class, UserGroupInformation.class}) +@PrepareForTest({ CopyUtils.class, FileUtils.class, Utils.class, UserGroupInformation.class, ReplChangeManager.class}) @PowerMockIgnore({ "javax.management.*" }) public class TestCopyUtils { /* @@ -111,6 +113,100 @@ public class TestCopyUtils { } @Test + public void testFSCallsFailOnParentExceptions() throws Exception { + mockStatic(UserGroupInformation.class); + mockStatic(ReplChangeManager.class); + when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class)); + HiveConf conf = mock(HiveConf.class); + conf.set(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.varname, "1s"); + FileSystem fs = mock(FileSystem.class); + Path source = mock(Path.class); + Path destination = mock(Path.class); + ContentSummary cs = mock(ContentSummary.class); + + Exception exception = new org.apache.hadoop.fs.PathPermissionException("Failed"); + when(ReplChangeManager.checksumFor(source, fs)).thenThrow(exception).thenReturn("dummy"); + when(fs.exists(same(source))).thenThrow(exception).thenReturn(true); + when(fs.delete(same(source), anyBoolean())).thenThrow(exception).thenReturn(true); + when(fs.mkdirs(same(source))).thenThrow(exception).thenReturn(true); + when(fs.rename(same(source), same(destination))).thenThrow(exception).thenReturn(true); + when(fs.getContentSummary(same(source))).thenThrow(exception).thenReturn(cs); + + CopyUtils copyUtils = new CopyUtils(UserGroupInformation.getCurrentUser().getUserName(), conf, fs); + CopyUtils copyUtilsSpy = Mockito.spy(copyUtils); + try { + copyUtilsSpy.exists(fs, source); + } catch (Exception e) { + assertEquals(exception.getClass(), e.getCause().getClass()); + } + Mockito.verify(fs, Mockito.times(1)).exists(source); + try { + copyUtils.delete(fs, source, true); + } catch (Exception e) { + assertEquals(exception.getClass(), e.getCause().getClass()); + } + Mockito.verify(fs, Mockito.times(1)).delete(source, true); + try { + copyUtils.mkdirs(fs, source); + } catch (Exception e) { + assertEquals(exception.getClass(), e.getCause().getClass()); + } + Mockito.verify(fs, Mockito.times(1)).mkdirs(source); + try { + copyUtils.rename(fs, source, destination); + } catch (Exception e) { + assertEquals(exception.getClass(), e.getCause().getClass()); + } + Mockito.verify(fs, Mockito.times(1)).rename(source, destination); + try { + copyUtilsSpy.getContentSummary(fs, source); + } catch (Exception e) { + assertEquals(exception.getClass(), e.getCause().getClass());; + } + Mockito.verify(fs, Mockito.times(1)).getContentSummary(source); + try { + copyUtilsSpy.checkSumFor(source, fs); + } catch (Exception e) { + assertEquals(exception.getClass(), e.getCause().getClass()); + } + Mockito.verify(copyUtilsSpy, Mockito.times(1)).checkSumFor(source, fs); + } + + @Test + public void testRetryableFSCalls() throws Exception { + mockStatic(UserGroupInformation.class); + mockStatic(ReplChangeManager.class); + when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class)); + HiveConf conf = mock(HiveConf.class); + conf.set(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.varname, "1s"); + FileSystem fs = mock(FileSystem.class); + Path source = mock(Path.class); + Path destination = mock(Path.class); + ContentSummary cs = mock(ContentSummary.class); + + when(ReplChangeManager.checksumFor(source, fs)).thenThrow(new IOException("Failed")).thenReturn("dummy"); + when(fs.exists(same(source))).thenThrow(new IOException("Failed")).thenReturn(true); + when(fs.delete(same(source), anyBoolean())).thenThrow(new IOException("Failed")).thenReturn(true); + when(fs.mkdirs(same(source))).thenThrow(new IOException("Failed")).thenReturn(true); + when(fs.rename(same(source), same(destination))).thenThrow(new IOException("Failed")).thenReturn(true); + when(fs.getContentSummary(same(source))).thenThrow(new IOException("Failed")).thenReturn(cs); + + CopyUtils copyUtils = new CopyUtils(UserGroupInformation.getCurrentUser().getUserName(), conf, fs); + CopyUtils copyUtilsSpy = Mockito.spy(copyUtils); + assertEquals (true, copyUtilsSpy.exists(fs, source)); + Mockito.verify(fs, Mockito.times(2)).exists(source); + assertEquals (true, copyUtils.delete(fs, source, true)); + Mockito.verify(fs, Mockito.times(2)).delete(source, true); + assertEquals (true, copyUtils.mkdirs(fs, source)); + Mockito.verify(fs, Mockito.times(2)).mkdirs(source); + assertEquals (true, copyUtils.rename(fs, source, destination)); + Mockito.verify(fs, Mockito.times(2)).rename(source, destination); + assertEquals (cs, copyUtilsSpy.getContentSummary(fs, source)); + Mockito.verify(fs, Mockito.times(2)).getContentSummary(source); + assertEquals ("dummy", copyUtilsSpy.checkSumFor(source, fs)); + } + + @Test public void testParallelCopySuccess() throws Exception { mockStatic(UserGroupInformation.class); when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class)); @@ -148,4 +244,4 @@ public class TestCopyUtils { Mockito.verify(mockExecutorService, Mockito.times(1)).invokeAll(callableCapture.capture()); } -} \ No newline at end of file +}