This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new fa4008ee9 Ignore file owners comparison on restore when config is set 
(#1684)
fa4008ee9 is described below

commit fa4008ee9c3f6b9499dfdceca1427b76a665420c
Author: Shekhar Sharma <[email protected]>
AuthorDate: Wed Aug 23 15:45:23 2023 -0700

    Ignore file owners comparison on restore when config is set (#1684)
    
    * Ignore file owners comparison on restore when config is set
    
    * Minor updates
    
    * Style fix in tests
    
    ---------
    
    Co-authored-by: Shekhar Sharma <[email protected]>
---
 .../org/apache/samza/config/BlobStoreConfig.java   |  8 +++
 .../storage/blobstore/BlobStoreBackupManager.java  |  2 +-
 .../storage/blobstore/BlobStoreRestoreManager.java | 21 +++---
 .../samza/storage/blobstore/util/DirDiffUtil.java  | 29 +++++---
 .../blobstore/TestBlobStoreBackupManager.java      |  4 +-
 .../blobstore/TestBlobStoreRestoreManager.java     | 20 +++---
 .../storage/blobstore/util/TestBlobStoreUtil.java  | 80 +++++++++++++++++++---
 .../blobstore/util/TestDirDiffUtilAreSameFile.java | 18 ++++-
 8 files changed, 138 insertions(+), 44 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
index c9a7ee1b0..9ed2d51d6 100644
--- a/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
@@ -47,6 +47,10 @@ public class BlobStoreConfig extends MapConfig {
   public static final String RETRY_POLICY_JITTER_FACTOR =  RETRY_POLICY_PREFIX 
+ "jitter.factor";
   // random retry delay between -0.1*retry-delay to 0.1*retry-delay
   public static final double DEFAULT_RETRY_POLICY_JITTER_FACTOR = 0.1;
+  // Set whether to compare file owners after restoring blobs from remote 
store. Useful when the job is started on a new
+  // machine with new gid/uid or if gid/uid changes due to host migration
+  public static final String COMPARE_FILE_OWNERS_ON_RESTORE = PREFIX + 
"compare.file.owners.on.restore";
+  public static final boolean DEFAULT_COMPARE_FILE_OWNERS_ON_RESTORE = true;
 
   public BlobStoreConfig(Config config) {
     super(config);
@@ -70,4 +74,8 @@ public class BlobStoreConfig extends MapConfig {
             getInt(RETRY_POLICY_BACKOFF_DELAY_FACTOR, 
DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR), ChronoUnit.MILLIS);
     return retryPolicyConfig;
   }
+
+  public boolean shouldCompareFileOwnersOnRestore() {
+    return getBoolean(COMPARE_FILE_OWNERS_ON_RESTORE, 
DEFAULT_COMPARE_FILE_OWNERS_ON_RESTORE);
+  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index 5f87de95b..997c5e6ca 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -207,7 +207,7 @@ public class BlobStoreBackupManager implements 
TaskBackupManager {
 
         long dirDiffStartTime = System.nanoTime();
         // get the diff between previous and current store directories
-        DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, 
DirDiffUtil.areSameFile(false));
+        DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, 
DirDiffUtil.areSameFile(false, true));
         metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - 
dirDiffStartTime);
 
         DirDiff.Stats stats = DirDiff.getStats(dirDiff);
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
index c370c62df..a62e2812c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
@@ -164,7 +164,8 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
   @Override
   public CompletableFuture<Void> restore() {
     return restoreStores(jobName, jobId, taskModel.getTaskName(), 
storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir,
-        storageConfig, metrics, storageManagerUtil, blobStoreUtil, 
dirDiffUtil, executor, false);
+        storageConfig, metrics, storageManagerUtil, blobStoreUtil, 
dirDiffUtil, executor, false,
+        blobStoreConfig.shouldCompareFileOwnersOnRestore());
   }
 
   /**
@@ -174,7 +175,8 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
    */
   public CompletableFuture<Void> restore(boolean restoreDeleted) {
     return restoreStores(jobName, jobId, taskModel.getTaskName(), 
storesToRestore, prevStoreSnapshotIndexes,
-        loggedBaseDir, storageConfig, metrics, storageManagerUtil, 
blobStoreUtil, dirDiffUtil, executor, restoreDeleted);
+        loggedBaseDir, storageConfig, metrics, storageManagerUtil, 
blobStoreUtil, dirDiffUtil, executor, restoreDeleted,
+        blobStoreConfig.shouldCompareFileOwnersOnRestore());
   }
 
   @Override
@@ -227,7 +229,7 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
       Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
       File loggedBaseDir, StorageConfig storageConfig, 
BlobStoreRestoreManagerMetrics metrics,
       StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, 
DirDiffUtil dirDiffUtil,
-      ExecutorService executor, boolean getDeleted) {
+      ExecutorService executor, boolean getDeleted, boolean compareFileOwners) 
{
     long restoreStartTime = System.nanoTime();
     List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
     LOG.debug("Starting restore for task: {} stores: {}", taskName, 
storesToRestore);
@@ -288,7 +290,7 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
 
         metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - 
storeRestoreStartTime);
         enqueueRestore(jobName, jobId, taskName.toString(), storeName, 
storeDir, dirIndex, storeRestoreStartTime,
-            restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, 
getDeleted);
+            restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, 
getDeleted, compareFileOwners);
       } else {
         LOG.debug("Renaming store checkpoint directory: {} to store directory: 
{} since its contents are identical " +
             "to the remote snapshot.", storeCheckpointDir, storeDir);
@@ -320,7 +322,7 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
         LOG.debug("Restoring task: {} store: {} from remote snapshot since the 
store is configured to be " +
             "restored on each restart.", taskName, storeName);
         restoreStore = true;
-      } else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, 
false).test(storeCheckpointDir.toFile(), dirIndex)) {
+      } else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, false, 
true).test(storeCheckpointDir.toFile(), dirIndex)) {
         restoreStore = false; // no restore required for this store.
       } else {
         // we don't optimize for the case when the local host doesn't contain 
the most recent store checkpoint
@@ -349,7 +351,8 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
   @VisibleForTesting
   static void enqueueRestore(String jobName, String jobId, String taskName, 
String storeName, File storeDir, DirIndex dirIndex,
       long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures, 
BlobStoreUtil blobStoreUtil,
-      DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, 
ExecutorService executor, boolean getDeleted) {
+      DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, 
ExecutorService executor, boolean getDeleted,
+      boolean compareFileOwners) {
 
     Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(), 
Optional.empty(), jobName, jobId, taskName, storeName);
     CompletableFuture<Void> restoreFuture =
@@ -357,8 +360,10 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
           metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - 
storeRestoreStartTime);
 
           long postRestoreStartTime = System.nanoTime();
-          LOG.trace("Comparing restored store directory: {} and remote 
directory to verify restore.", storeDir);
-          if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir, 
dirIndex)) {
+          LOG.trace(
+              "Comparing restored store directory: {} and remote directory to 
verify restore with compareFileOwners set to: {}",
+              storeDir, compareFileOwners);
+          if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true, 
compareFileOwners).test(storeDir, dirIndex)) {
             metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - 
postRestoreStartTime);
             throw new SamzaException(
                 String.format("Restored store directory: %s contents " + "are 
not the same as the remote snapshot.",
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
index 6c2a70694..b89f42a31 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
@@ -64,22 +64,23 @@ public class DirDiffUtil {
   /**
    * Checks if a local directory and a remote directory are identical. Local 
and remote directories are identical iff:
    * 1. The local directory has exactly the same set of files as the remote 
directory, and the files are themselves
-   * identical, as determined by {@link #areSameFile(boolean)}, except for 
those allowed to differ according to
+   * identical, as determined by {@link #areSameFile(boolean, boolean)}, 
except for those allowed to differ according to
    * {@code filesToIgnore}.
    * 2. The local directory has exactly the same set of sub-directories as the 
remote directory.
    *
    * @param filesToIgnore a set of file names to ignore during the directory 
comparisons
    *                      (does not exclude directory names)
    * @param compareLargeFileChecksums whether to compare checksums for large 
files (&gt; 1 MB).
+   * @param compareFileOwners whether to compare file owners
    * @return boolean indicating whether the local and remote directory are 
identical.
    */
   // TODO HIGH shesharm add unit tests
-  public BiPredicate<File, DirIndex> areSameDir(Set<String> filesToIgnore, 
boolean compareLargeFileChecksums) {
+  public BiPredicate<File, DirIndex> areSameDir(Set<String> filesToIgnore, 
boolean compareLargeFileChecksums, boolean compareFileOwners) {
     return (localDir, remoteDir) -> {
       String remoteDirName = 
remoteDir.getDirName().equals(DirIndex.ROOT_DIR_NAME) ? "root" : 
remoteDir.getDirName();
       LOG.debug("Creating diff between local dir: {} and remote dir: {} for 
comparison.",
           localDir.getAbsolutePath(), remoteDirName);
-      DirDiff dirDiff = DirDiffUtil.getDirDiff(localDir, remoteDir, 
DirDiffUtil.areSameFile(compareLargeFileChecksums));
+      DirDiff dirDiff = DirDiffUtil.getDirDiff(localDir, remoteDir, 
DirDiffUtil.areSameFile(compareLargeFileChecksums, compareFileOwners));
 
       boolean areSameDir = true;
       List<String> filesRemoved = dirDiff.getFilesRemoved().stream()
@@ -129,7 +130,7 @@ public class DirDiffUtil {
         String localSubDirName = subDirRetained.getDirName();
         File localSubDirFile = Paths.get(localDir.getAbsolutePath(), 
localSubDirName).toFile();
         DirIndex remoteSubDir = remoteSubDirs.get(localSubDirName);
-        boolean areSameSubDir = areSameDir(filesToIgnore, 
false).test(localSubDirFile, remoteSubDir);
+        boolean areSameSubDir = areSameDir(filesToIgnore, false, 
compareFileOwners).test(localSubDirFile, remoteSubDir);
         if (!areSameSubDir) {
           LOG.debug("Local sub-dir: {} and remote sub-dir: {} are not same.",
               localSubDirFile.getAbsolutePath(), remoteSubDir.getDirName());
@@ -148,9 +149,11 @@ public class DirDiffUtil {
    * the same file. Files with same attributes as well as content are same 
file. A SST file in a special case. They are
    * immutable, so we only compare their attributes but not the content.
    * @param compareLargeFileChecksums whether to compare checksums for large 
files (&gt; 1 MB).
+   * @param compareFileOwners whether to compare owners of the files. Useful 
when migrating an exiting job to new machine(s)
+   *                          that may have different owner ids.
    * @return BiPredicate to test similarity of local and remote files
    */
-  public static BiPredicate<File, FileIndex> areSameFile(boolean 
compareLargeFileChecksums) {
+  public static BiPredicate<File, FileIndex> areSameFile(boolean 
compareLargeFileChecksums, boolean compareFileOwners) {
 
     // Cache owner/group names to reduce calls to 
sun.nio.fs.UnixFileAttributes.group
     Cache<String, String> groupCache = 
CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
@@ -168,11 +171,17 @@ public class DirDiffUtil {
 
           // Don't compare file timestamps. The ctime of a local file just 
restored will be different than the
           // remote file, and will cause the file to be uploaded again during 
the first commit after restore.
-          areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize() 
&&
-              
groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), 
"unix:gid")),
-                () -> 
localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) &&
-              
ownerCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), 
"unix:uid")),
-                () -> 
localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
+          areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize();
+          // In case a job is moved to a new cluster/machine, the owners 
(gid/uid) may be different than the one present
+          // in the remote snapshot. This flag indicates if we should compare 
it at all.
+          if (compareFileOwners) {
+            LOG.trace("Comparing owners of remote and local copy of file {}", 
localFile.getAbsolutePath());
+            areSameFiles =
+                areSameFiles && 
groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), 
"unix:gid")),
+                  () -> 
localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) && 
ownerCache.get(
+                       String.valueOf(Files.getAttribute(localFile.toPath(), 
"unix:uid")),
+                    () -> 
localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
+          }
 
         } catch (IOException | ExecutionException e) {
           LOG.error("Error reading attributes for file: {}", 
localFile.getAbsolutePath());
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
index 4f35521dd..b48166546 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
@@ -261,7 +261,7 @@ public class TestBlobStoreBackupManager {
           File localCheckpointDir = new File(localRemoteSnapshotPair.getLeft() 
+ "-" + checkpointId.serialize());
           DirIndex dirIndex = new DirIndex(localCheckpointDir.getName(), 
Collections.emptyList(), Collections.emptyList(),
               Collections.emptyList(), Collections.emptyList());
-          return DirDiffUtil.getDirDiff(localCheckpointDir, dirIndex, 
DirDiffUtil.areSameFile(false));
+          return DirDiffUtil.getDirDiff(localCheckpointDir, dirIndex, 
DirDiffUtil.areSameFile(false, true));
         }).collect(Collectors.toCollection(() -> new 
TreeSet<>(Comparator.comparing(DirDiff::getDirName))));
 
     // assert - asset all DirDiff are put to blob store
@@ -369,7 +369,7 @@ public class TestBlobStoreBackupManager {
         .stream()
         .map(localRemoteSnapshotPair ->
             DirDiffUtil.getDirDiff(new File(localRemoteSnapshotPair.getLeft() 
+ "-" + checkpointId.serialize()),
-            localRemoteSnapshotPair.getRight().getDirIndex(), 
DirDiffUtil.areSameFile(false)))
+            localRemoteSnapshotPair.getRight().getDirIndex(), 
DirDiffUtil.areSameFile(false, true)))
         .collect(Collectors.toCollection(() -> new 
TreeSet<>(Comparator.comparing(DirDiff::getDirName))));
 
     // assert - asset all DirDiff are put to blob store
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
index 67d62759e..6caa69b6e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
@@ -141,7 +141,7 @@ public class TestBlobStoreRestoreManager {
     StorageConfig storageConfig = mock(StorageConfig.class);
     
when(storageConfig.cleanLoggedStoreDirsOnStart(anyString())).thenReturn(false);
     DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
-    when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, 
arg2) -> false);
+    when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), 
anyBoolean())).thenReturn((arg1, arg2) -> false);
 
     boolean shouldRestore = BlobStoreRestoreManager.shouldRestore(
         taskName, storeName, dirIndex, storeCheckpointDir, storageConfig, 
dirDiffUtil);
@@ -158,12 +158,12 @@ public class TestBlobStoreRestoreManager {
     StorageConfig storageConfig = mock(StorageConfig.class);
     
when(storageConfig.cleanLoggedStoreDirsOnStart(anyString())).thenReturn(false);
     DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
-    when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, 
arg2) -> true); // are same dir
+    when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), 
anyBoolean())).thenReturn((arg1, arg2) -> true); // are same dir
 
     boolean shouldRestore = BlobStoreRestoreManager.shouldRestore(
         taskName, storeName, dirIndex, storeCheckpointDir, storageConfig, 
dirDiffUtil);
 
-    verify(dirDiffUtil, times(1)).areSameDir(anySet(), anyBoolean());
+    verify(dirDiffUtil, times(1)).areSameDir(anySet(), anyBoolean(), 
anyBoolean());
     assertFalse(shouldRestore); // should not restore, should retain 
checkpoint dir instead
   }
 
@@ -200,11 +200,11 @@ public class TestBlobStoreRestoreManager {
     // return immediately without restoring.
     when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class), anyBoolean()))
         .thenReturn(CompletableFuture.completedFuture(null));
-    when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, 
arg2) -> true);
+    when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), 
anyBoolean())).thenReturn((arg1, arg2) -> true);
 
     BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, 
storesToRestore, prevStoreSnapshotIndexes,
         loggedBaseDir.toFile(), storageConfig, metrics,
-        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
+        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);
 
     // verify that the store directory restore was called and skipped (i.e. 
shouldRestore == true)
     verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class), anyBoolean());
@@ -249,14 +249,14 @@ public class TestBlobStoreRestoreManager {
     BlobStoreUtil blobStoreUtil = mock(BlobStoreUtil.class);
     DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
 
-    when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, 
arg2) -> true);
+    when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), 
anyBoolean())).thenReturn((arg1, arg2) -> true);
     // return immediately without restoring.
     when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class), anyBoolean()))
         .thenReturn(CompletableFuture.completedFuture(null));
 
     BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, 
storesToRestore, prevStoreSnapshotIndexes,
         loggedBaseDir.toFile(), storageConfig, metrics,
-        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
+        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);
 
     // verify that the store directory restore was called and skipped (i.e. 
shouldRestore == true)
     verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class), anyBoolean());
@@ -305,14 +305,14 @@ public class TestBlobStoreRestoreManager {
     DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
 
     // ensures shouldRestore is not called
-    when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, 
arg2) -> true);
+    when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), 
anyBoolean())).thenReturn((arg1, arg2) -> true);
     // return immediately without restoring.
     when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class), anyBoolean()))
         .thenReturn(CompletableFuture.completedFuture(null));
 
     BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, 
storesToRestore, prevStoreSnapshotIndexes,
         loggedBaseDir.toFile(), storageConfig, metrics,
-        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
+        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);
 
     // verify that the store directory restore was not called (should have 
restored from checkpoint dir)
     verify(blobStoreUtil, times(0)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class), anyBoolean());
@@ -349,7 +349,7 @@ public class TestBlobStoreRestoreManager {
 
     BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, 
storesToRestore, prevStoreSnapshotIndexes,
         loggedBaseDir.toFile(), storageConfig, metrics,
-        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
+        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);
 
     // verify that we checked the previously checkpointed SCMs.
     verify(prevStoreSnapshotIndexes, times(1)).containsKey(eq("newStoreName"));
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
index 5b79315b6..a44f86e64 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
@@ -540,12 +540,12 @@ public class TestBlobStoreUtil {
     // checksum should be ignored for sst file. Set any dummy value
     FileIndex sstFileIndex = new FileIndex(sstFile.getFileName().toString(), 
Collections.emptyList(), sstFileMetadata, 0L);
 
-    assertTrue(DirDiffUtil.areSameFile(false).test(sstFile.toFile(), 
sstFileIndex));
+    assertTrue(DirDiffUtil.areSameFile(false, true).test(sstFile.toFile(), 
sstFileIndex));
 
     // 2. test with sst file with different timestamps
     // Update last modified time
     Files.setLastModifiedTime(sstFile, 
FileTime.fromMillis(System.currentTimeMillis() + 1000L));
-    assertTrue(DirDiffUtil.areSameFile(false).test(sstFile.toFile(), 
sstFileIndex));
+    assertTrue(DirDiffUtil.areSameFile(false, true).test(sstFile.toFile(), 
sstFileIndex));
 
     // 3. test with non-sst files with same metadata and content
     Path tmpFile = Files.createTempFile("samza-testAreSameFiles-", ".tmp");
@@ -559,18 +559,18 @@ public class TestBlobStoreUtil {
     FileIndex tmpFileIndex = new FileIndex(tmpFile.getFileName().toString(), 
Collections.emptyList(), tmpFileMetadata,
         FileUtils.checksumCRC32(tmpFile.toFile()));
 
-    assertTrue(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(), 
tmpFileIndex));
+    assertTrue(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(), 
tmpFileIndex));
 
     // 4. test with non-sst files with different attributes
     // change lastModifiedTime of local file
     FileTime prevLastModified = tmpFileAttribs.lastModifiedTime();
     Files.setLastModifiedTime(tmpFile, 
FileTime.fromMillis(System.currentTimeMillis() + 1000L));
-    assertTrue(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(), 
tmpFileIndex));
+    assertTrue(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(), 
tmpFileIndex));
 
     // change content/checksum of local file
     Files.setLastModifiedTime(tmpFile, prevLastModified); // reset attributes 
to match with remote file
     fileUtil.writeToTextFile(tmpFile.toFile(), RandomStringUtils.random(1000), 
false); //new content
-    assertFalse(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(), 
tmpFileIndex));
+    assertFalse(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(), 
tmpFileIndex));
   }
 
   @Test
@@ -626,7 +626,7 @@ public class TestBlobStoreUtil {
     blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata, false).join();
 
     assertTrue(
-        new DirDiffUtil().areSameDir(Collections.emptySet(), 
false).test(restoreDirBasePath.toFile(), mockDirIndex));
+        new DirDiffUtil().areSameDir(Collections.emptySet(), false, 
true).test(restoreDirBasePath.toFile(), mockDirIndex));
   }
 
   @Test
@@ -684,7 +684,7 @@ public class TestBlobStoreUtil {
     blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata, false).join();
 
     assertTrue(
-        new DirDiffUtil().areSameDir(Collections.emptySet(), 
false).test(restoreDirBasePath.toFile(), mockDirIndex));
+        new DirDiffUtil().areSameDir(Collections.emptySet(), false, 
true).test(restoreDirBasePath.toFile(), mockDirIndex));
   }
 
   @Test
@@ -742,6 +742,66 @@ public class TestBlobStoreUtil {
     }
   }
 
+
+  @Test
+  public void testRestoreIgnoresDifferentFileOwnersOnConfig() throws 
IOException {
+    Path restoreDirBasePath = 
Files.createTempDirectory(BlobStoreTestUtil.TEMP_DIR_PREFIX);
+
+    // remote file == 26 blobs, blob ids from a to z, blob contents from a to 
z, offsets 0 to 25.
+    DirIndex mockDirIndex = mock(DirIndex.class);
+    when(mockDirIndex.getDirName()).thenReturn(DirIndex.ROOT_DIR_NAME);
+    FileIndex mockFileIndex = mock(FileIndex.class);
+    when(mockFileIndex.getFileName()).thenReturn("1.sst");
+
+    // setup mock file attributes. create a temp file to get current 
user/group/permissions so that they
+    // match with restored files.
+    File tmpFile = Paths.get(restoreDirBasePath.toString(), "tempfile-" + new 
Random().nextInt()).toFile();
+    tmpFile.createNewFile();
+    PosixFileAttributes attrs = Files.readAttributes(tmpFile.toPath(), 
PosixFileAttributes.class);
+    // create remote file with different owner than local file
+    FileMetadata fileMetadata = new FileMetadata(1234L, 1243L, 26, // ctime 
mtime does not matter. size == 26
+        attrs.owner().getName() + "_different", attrs.group().getName(), 
PosixFilePermissions.toString(attrs.permissions()));
+    when(mockFileIndex.getFileMetadata()).thenReturn(fileMetadata);
+    Files.delete(tmpFile.toPath()); // delete so that it doesn't show up in 
restored dir contents.
+
+    List<FileBlob> mockFileBlobs = new ArrayList<>();
+    StringBuilder fileContents = new StringBuilder();
+    for (int i = 0; i < 26; i++) {
+      FileBlob mockFileBlob = mock(FileBlob.class);
+      char c = (char) ('a' + i);
+      fileContents.append(c); // blob contents == blobId
+      when(mockFileBlob.getBlobId()).thenReturn(String.valueOf(c));
+      when(mockFileBlob.getOffset()).thenReturn(i);
+      mockFileBlobs.add(mockFileBlob);
+    }
+    when(mockFileIndex.getBlobs()).thenReturn(mockFileBlobs);
+    CRC32 checksum = new CRC32();
+    checksum.update(fileContents.toString().getBytes());
+    when(mockFileIndex.getChecksum()).thenReturn(checksum.getValue());
+    
when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex));
+
+    BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
+    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class), any(Boolean.class))).thenAnswer(
+      (Answer<CompletionStage<Void>>) invocationOnMock -> {
+        String blobId = invocationOnMock.getArgumentAt(0, String.class);
+        OutputStream outputStream = invocationOnMock.getArgumentAt(1, 
OutputStream.class);
+        // blob contents = blob id
+        outputStream.write(blobId.getBytes());
+
+        // force flush so that the checksum calculation later uses the full 
file contents.
+        ((FileOutputStream) outputStream).getFD().sync();
+        return CompletableFuture.completedFuture(null);
+      });
+
+    BlobStoreConfig config = mock(BlobStoreConfig.class);
+    when(config.shouldCompareFileOwnersOnRestore()).thenReturn(false);
+    BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, 
EXECUTOR, blobStoreConfig, null, null);
+    blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata, false).join();
+
+    assertTrue(
+        new DirDiffUtil().areSameDir(Collections.emptySet(), false, 
config.shouldCompareFileOwnersOnRestore()).test(restoreDirBasePath.toFile(), 
mockDirIndex));
+  }
+
   @Test
   @Ignore // TODO remove
   public void testRestoreDirRecreatesEmptyFilesAndDirs() throws IOException {
@@ -758,7 +818,7 @@ public class TestBlobStoreUtil {
         outputStream.write(blobId.getBytes());
         return CompletableFuture.completedFuture(null);
       });
-    boolean result = new DirDiffUtil().areSameDir(new TreeSet<>(), 
false).test(localSnapshot.toFile(), dirIndex);
+    boolean result = new DirDiffUtil().areSameDir(new TreeSet<>(), false, 
true).test(localSnapshot.toFile(), dirIndex);
     assertFalse(result);
     //ToDo complete
   }
@@ -788,7 +848,7 @@ public class TestBlobStoreUtil {
     BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, 
EXECUTOR, blobStoreConfig, null, null);
     blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), dirIndex, metadata, 
false).join();
 
-    assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(), 
false).test(restoreDirBasePath.toFile(), dirIndex));
+    assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(), false, 
true).test(restoreDirBasePath.toFile(), dirIndex));
   }
 
   /**
@@ -950,7 +1010,7 @@ public class TestBlobStoreUtil {
    * This test verifies that a retriable exception is retried more than 3 
times (default retry is limited to 3 attempts)
    */
   @Test
-  public void testPutFileRetriedMorethanThreeTimes() throws Exception {
+  public void testPutFileRetriedMoreThanThreeTimes() throws Exception {
     SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, 
jobName, jobId, taskName, storeName);
     Path path = Files.createTempFile("samza-testPutFileChecksum-", ".tmp");
     FileUtil fileUtil = new FileUtil();
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
index 98f75122a..1bd2ee393 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -60,7 +60,7 @@ public class TestDirDiffUtilAreSameFile {
 
   @Before
   public void testSetup() throws Exception {
-    areSameFile = DirDiffUtil.areSameFile(false);
+    areSameFile = DirDiffUtil.areSameFile(false, true);
     createFile(SMALL_FILE);
   }
 
@@ -129,6 +129,18 @@ public class TestDirDiffUtilAreSameFile {
     Assert.assertFalse(areSameFile.test(localFile, remoteFile));
   }
 
+  @Test
+  public void testAreSameFile_IgnoreDifferentOwner() {
+    areSameFile = DirDiffUtil.areSameFile(false, false);
+    remoteFileMetadata = new FileMetadata(0, 0,
+        localContentLength,
+        localFileAttrs.owner().getName() + "_different",
+        localFileAttrs.group().getName(),
+        PosixFilePermissions.toString(localFileAttrs.permissions()));
+    remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(), 
remoteFileMetadata, localChecksum);
+    Assert.assertTrue(areSameFile.test(localFile, remoteFile));
+  }
+
   @Test
   public void testAreSameFile_DifferentGroup() {
     remoteFileMetadata = new FileMetadata(0, 0,
@@ -197,7 +209,7 @@ public class TestDirDiffUtilAreSameFile {
     createFile(LARGE_FILE);
 
     for (int i = 0; i < 5; i++) {
-      BiPredicate<File, FileIndex> areSameFile = 
DirDiffUtil.areSameFile(false);
+      BiPredicate<File, FileIndex> areSameFile = 
DirDiffUtil.areSameFile(false, true);
       for (int j = 0; j < 20; j++) {
         localFile = mock(File.class);
         when(localFile.getName()).thenReturn("name");

Reply via email to