This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new fa4008ee9 Ignore file owners comparison on restore when config is set
(#1684)
fa4008ee9 is described below
commit fa4008ee9c3f6b9499dfdceca1427b76a665420c
Author: Shekhar Sharma <[email protected]>
AuthorDate: Wed Aug 23 15:45:23 2023 -0700
Ignore file owners comparison on restore when config is set (#1684)
* Ignore file owners comparison on restore when config is set
* Minor updates
* Style fix in tests
---------
Co-authored-by: Shekhar Sharma <[email protected]>
---
.../org/apache/samza/config/BlobStoreConfig.java | 8 +++
.../storage/blobstore/BlobStoreBackupManager.java | 2 +-
.../storage/blobstore/BlobStoreRestoreManager.java | 21 +++---
.../samza/storage/blobstore/util/DirDiffUtil.java | 29 +++++---
.../blobstore/TestBlobStoreBackupManager.java | 4 +-
.../blobstore/TestBlobStoreRestoreManager.java | 20 +++---
.../storage/blobstore/util/TestBlobStoreUtil.java | 80 +++++++++++++++++++---
.../blobstore/util/TestDirDiffUtilAreSameFile.java | 18 ++++-
8 files changed, 138 insertions(+), 44 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
b/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
index c9a7ee1b0..9ed2d51d6 100644
--- a/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java
@@ -47,6 +47,10 @@ public class BlobStoreConfig extends MapConfig {
public static final String RETRY_POLICY_JITTER_FACTOR = RETRY_POLICY_PREFIX
+ "jitter.factor";
// random retry delay between -0.1*retry-delay to 0.1*retry-delay
public static final double DEFAULT_RETRY_POLICY_JITTER_FACTOR = 0.1;
+ // Set whether to compare file owners after restoring blobs from remote
store. Useful when the job is started on a new
+ // machine with new gid/uid or if gid/uid changes due to host migration
+ public static final String COMPARE_FILE_OWNERS_ON_RESTORE = PREFIX +
"compare.file.owners.on.restore";
+ public static final boolean DEFAULT_COMPARE_FILE_OWNERS_ON_RESTORE = true;
public BlobStoreConfig(Config config) {
super(config);
@@ -70,4 +74,8 @@ public class BlobStoreConfig extends MapConfig {
getInt(RETRY_POLICY_BACKOFF_DELAY_FACTOR,
DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR), ChronoUnit.MILLIS);
return retryPolicyConfig;
}
+
+ public boolean shouldCompareFileOwnersOnRestore() {
+ return getBoolean(COMPARE_FILE_OWNERS_ON_RESTORE,
DEFAULT_COMPARE_FILE_OWNERS_ON_RESTORE);
+ }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index 5f87de95b..997c5e6ca 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -207,7 +207,7 @@ public class BlobStoreBackupManager implements
TaskBackupManager {
long dirDiffStartTime = System.nanoTime();
// get the diff between previous and current store directories
- DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex,
DirDiffUtil.areSameFile(false));
+ DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex,
DirDiffUtil.areSameFile(false, true));
metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() -
dirDiffStartTime);
DirDiff.Stats stats = DirDiff.getStats(dirDiff);
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
index c370c62df..a62e2812c 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
@@ -164,7 +164,8 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
@Override
public CompletableFuture<Void> restore() {
return restoreStores(jobName, jobId, taskModel.getTaskName(),
storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir,
- storageConfig, metrics, storageManagerUtil, blobStoreUtil,
dirDiffUtil, executor, false);
+ storageConfig, metrics, storageManagerUtil, blobStoreUtil,
dirDiffUtil, executor, false,
+ blobStoreConfig.shouldCompareFileOwnersOnRestore());
}
/**
@@ -174,7 +175,8 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
*/
public CompletableFuture<Void> restore(boolean restoreDeleted) {
return restoreStores(jobName, jobId, taskModel.getTaskName(),
storesToRestore, prevStoreSnapshotIndexes,
- loggedBaseDir, storageConfig, metrics, storageManagerUtil,
blobStoreUtil, dirDiffUtil, executor, restoreDeleted);
+ loggedBaseDir, storageConfig, metrics, storageManagerUtil,
blobStoreUtil, dirDiffUtil, executor, restoreDeleted,
+ blobStoreConfig.shouldCompareFileOwnersOnRestore());
}
@Override
@@ -227,7 +229,7 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
File loggedBaseDir, StorageConfig storageConfig,
BlobStoreRestoreManagerMetrics metrics,
StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil,
DirDiffUtil dirDiffUtil,
- ExecutorService executor, boolean getDeleted) {
+ ExecutorService executor, boolean getDeleted, boolean compareFileOwners)
{
long restoreStartTime = System.nanoTime();
List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
LOG.debug("Starting restore for task: {} stores: {}", taskName,
storesToRestore);
@@ -288,7 +290,7 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() -
storeRestoreStartTime);
enqueueRestore(jobName, jobId, taskName.toString(), storeName,
storeDir, dirIndex, storeRestoreStartTime,
- restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor,
getDeleted);
+ restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor,
getDeleted, compareFileOwners);
} else {
LOG.debug("Renaming store checkpoint directory: {} to store directory:
{} since its contents are identical " +
"to the remote snapshot.", storeCheckpointDir, storeDir);
@@ -320,7 +322,7 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
LOG.debug("Restoring task: {} store: {} from remote snapshot since the
store is configured to be " +
"restored on each restart.", taskName, storeName);
restoreStore = true;
- } else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE,
false).test(storeCheckpointDir.toFile(), dirIndex)) {
+ } else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, false,
true).test(storeCheckpointDir.toFile(), dirIndex)) {
restoreStore = false; // no restore required for this store.
} else {
// we don't optimize for the case when the local host doesn't contain
the most recent store checkpoint
@@ -349,7 +351,8 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
@VisibleForTesting
static void enqueueRestore(String jobName, String jobId, String taskName,
String storeName, File storeDir, DirIndex dirIndex,
long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures,
BlobStoreUtil blobStoreUtil,
- DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics,
ExecutorService executor, boolean getDeleted) {
+ DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics,
ExecutorService executor, boolean getDeleted,
+ boolean compareFileOwners) {
Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(),
Optional.empty(), jobName, jobId, taskName, storeName);
CompletableFuture<Void> restoreFuture =
@@ -357,8 +360,10 @@ public class BlobStoreRestoreManager implements
TaskRestoreManager {
metrics.storeRestoreNs.get(storeName).set(System.nanoTime() -
storeRestoreStartTime);
long postRestoreStartTime = System.nanoTime();
- LOG.trace("Comparing restored store directory: {} and remote
directory to verify restore.", storeDir);
- if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir,
dirIndex)) {
+ LOG.trace(
+ "Comparing restored store directory: {} and remote directory to
verify restore with compareFileOwners set to: {}",
+ storeDir, compareFileOwners);
+ if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true,
compareFileOwners).test(storeDir, dirIndex)) {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() -
postRestoreStartTime);
throw new SamzaException(
String.format("Restored store directory: %s contents " + "are
not the same as the remote snapshot.",
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
index 6c2a70694..b89f42a31 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
@@ -64,22 +64,23 @@ public class DirDiffUtil {
/**
* Checks if a local directory and a remote directory are identical. Local
and remote directories are identical iff:
* 1. The local directory has exactly the same set of files as the remote
directory, and the files are themselves
- * identical, as determined by {@link #areSameFile(boolean)}, except for
those allowed to differ according to
+ * identical, as determined by {@link #areSameFile(boolean, boolean)},
except for those allowed to differ according to
* {@code filesToIgnore}.
* 2. The local directory has exactly the same set of sub-directories as the
remote directory.
*
* @param filesToIgnore a set of file names to ignore during the directory
comparisons
* (does not exclude directory names)
* @param compareLargeFileChecksums whether to compare checksums for large
files (> 1 MB).
+ * @param compareFileOwners whether to compare file owners
* @return boolean indicating whether the local and remote directory are
identical.
*/
// TODO HIGH shesharm add unit tests
- public BiPredicate<File, DirIndex> areSameDir(Set<String> filesToIgnore,
boolean compareLargeFileChecksums) {
+ public BiPredicate<File, DirIndex> areSameDir(Set<String> filesToIgnore,
boolean compareLargeFileChecksums, boolean compareFileOwners) {
return (localDir, remoteDir) -> {
String remoteDirName =
remoteDir.getDirName().equals(DirIndex.ROOT_DIR_NAME) ? "root" :
remoteDir.getDirName();
LOG.debug("Creating diff between local dir: {} and remote dir: {} for
comparison.",
localDir.getAbsolutePath(), remoteDirName);
- DirDiff dirDiff = DirDiffUtil.getDirDiff(localDir, remoteDir,
DirDiffUtil.areSameFile(compareLargeFileChecksums));
+ DirDiff dirDiff = DirDiffUtil.getDirDiff(localDir, remoteDir,
DirDiffUtil.areSameFile(compareLargeFileChecksums, compareFileOwners));
boolean areSameDir = true;
List<String> filesRemoved = dirDiff.getFilesRemoved().stream()
@@ -129,7 +130,7 @@ public class DirDiffUtil {
String localSubDirName = subDirRetained.getDirName();
File localSubDirFile = Paths.get(localDir.getAbsolutePath(),
localSubDirName).toFile();
DirIndex remoteSubDir = remoteSubDirs.get(localSubDirName);
- boolean areSameSubDir = areSameDir(filesToIgnore,
false).test(localSubDirFile, remoteSubDir);
+ boolean areSameSubDir = areSameDir(filesToIgnore, false,
compareFileOwners).test(localSubDirFile, remoteSubDir);
if (!areSameSubDir) {
LOG.debug("Local sub-dir: {} and remote sub-dir: {} are not same.",
localSubDirFile.getAbsolutePath(), remoteSubDir.getDirName());
@@ -148,9 +149,11 @@ public class DirDiffUtil {
* the same file. Files with same attributes as well as content are same
file. A SST file in a special case. They are
* immutable, so we only compare their attributes but not the content.
* @param compareLargeFileChecksums whether to compare checksums for large
files (> 1 MB).
+ * @param compareFileOwners whether to compare owners of the files. Useful
when migrating an exiting job to new machine(s)
+ * that may have different owner ids.
* @return BiPredicate to test similarity of local and remote files
*/
- public static BiPredicate<File, FileIndex> areSameFile(boolean
compareLargeFileChecksums) {
+ public static BiPredicate<File, FileIndex> areSameFile(boolean
compareLargeFileChecksums, boolean compareFileOwners) {
// Cache owner/group names to reduce calls to
sun.nio.fs.UnixFileAttributes.group
Cache<String, String> groupCache =
CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
@@ -168,11 +171,17 @@ public class DirDiffUtil {
// Don't compare file timestamps. The ctime of a local file just
restored will be different than the
// remote file, and will cause the file to be uploaded again during
the first commit after restore.
- areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize()
&&
-
groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(),
"unix:gid")),
- () ->
localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) &&
-
ownerCache.get(String.valueOf(Files.getAttribute(localFile.toPath(),
"unix:uid")),
- () ->
localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
+ areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize();
+ // In case a job is moved to a new cluster/machine, the owners
(gid/uid) may be different than the one present
+ // in the remote snapshot. This flag indicates if we should compare
it at all.
+ if (compareFileOwners) {
+ LOG.trace("Comparing owners of remote and local copy of file {}",
localFile.getAbsolutePath());
+ areSameFiles =
+ areSameFiles &&
groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(),
"unix:gid")),
+ () ->
localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) &&
ownerCache.get(
+ String.valueOf(Files.getAttribute(localFile.toPath(),
"unix:uid")),
+ () ->
localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
+ }
} catch (IOException | ExecutionException e) {
LOG.error("Error reading attributes for file: {}",
localFile.getAbsolutePath());
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
index 4f35521dd..b48166546 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
@@ -261,7 +261,7 @@ public class TestBlobStoreBackupManager {
File localCheckpointDir = new File(localRemoteSnapshotPair.getLeft()
+ "-" + checkpointId.serialize());
DirIndex dirIndex = new DirIndex(localCheckpointDir.getName(),
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
- return DirDiffUtil.getDirDiff(localCheckpointDir, dirIndex,
DirDiffUtil.areSameFile(false));
+ return DirDiffUtil.getDirDiff(localCheckpointDir, dirIndex,
DirDiffUtil.areSameFile(false, true));
}).collect(Collectors.toCollection(() -> new
TreeSet<>(Comparator.comparing(DirDiff::getDirName))));
// assert - asset all DirDiff are put to blob store
@@ -369,7 +369,7 @@ public class TestBlobStoreBackupManager {
.stream()
.map(localRemoteSnapshotPair ->
DirDiffUtil.getDirDiff(new File(localRemoteSnapshotPair.getLeft()
+ "-" + checkpointId.serialize()),
- localRemoteSnapshotPair.getRight().getDirIndex(),
DirDiffUtil.areSameFile(false)))
+ localRemoteSnapshotPair.getRight().getDirIndex(),
DirDiffUtil.areSameFile(false, true)))
.collect(Collectors.toCollection(() -> new
TreeSet<>(Comparator.comparing(DirDiff::getDirName))));
// assert - asset all DirDiff are put to blob store
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
index 67d62759e..6caa69b6e 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
@@ -141,7 +141,7 @@ public class TestBlobStoreRestoreManager {
StorageConfig storageConfig = mock(StorageConfig.class);
when(storageConfig.cleanLoggedStoreDirsOnStart(anyString())).thenReturn(false);
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
- when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1,
arg2) -> false);
+ when(dirDiffUtil.areSameDir(anySet(), anyBoolean(),
anyBoolean())).thenReturn((arg1, arg2) -> false);
boolean shouldRestore = BlobStoreRestoreManager.shouldRestore(
taskName, storeName, dirIndex, storeCheckpointDir, storageConfig,
dirDiffUtil);
@@ -158,12 +158,12 @@ public class TestBlobStoreRestoreManager {
StorageConfig storageConfig = mock(StorageConfig.class);
when(storageConfig.cleanLoggedStoreDirsOnStart(anyString())).thenReturn(false);
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
- when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1,
arg2) -> true); // are same dir
+ when(dirDiffUtil.areSameDir(anySet(), anyBoolean(),
anyBoolean())).thenReturn((arg1, arg2) -> true); // are same dir
boolean shouldRestore = BlobStoreRestoreManager.shouldRestore(
taskName, storeName, dirIndex, storeCheckpointDir, storageConfig,
dirDiffUtil);
- verify(dirDiffUtil, times(1)).areSameDir(anySet(), anyBoolean());
+ verify(dirDiffUtil, times(1)).areSameDir(anySet(), anyBoolean(),
anyBoolean());
assertFalse(shouldRestore); // should not restore, should retain
checkpoint dir instead
}
@@ -200,11 +200,11 @@ public class TestBlobStoreRestoreManager {
// return immediately without restoring.
when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
- when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1,
arg2) -> true);
+ when(dirDiffUtil.areSameDir(anySet(), anyBoolean(),
anyBoolean())).thenReturn((arg1, arg2) -> true);
BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName,
storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
- storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
+ storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);
// verify that the store directory restore was called and skipped (i.e.
shouldRestore == true)
verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class), anyBoolean());
@@ -249,14 +249,14 @@ public class TestBlobStoreRestoreManager {
BlobStoreUtil blobStoreUtil = mock(BlobStoreUtil.class);
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
- when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1,
arg2) -> true);
+ when(dirDiffUtil.areSameDir(anySet(), anyBoolean(),
anyBoolean())).thenReturn((arg1, arg2) -> true);
// return immediately without restoring.
when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName,
storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
- storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
+ storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);
// verify that the store directory restore was called and skipped (i.e.
shouldRestore == true)
verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class), anyBoolean());
@@ -305,14 +305,14 @@ public class TestBlobStoreRestoreManager {
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
// ensures shouldRestore is not called
- when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1,
arg2) -> true);
+ when(dirDiffUtil.areSameDir(anySet(), anyBoolean(),
anyBoolean())).thenReturn((arg1, arg2) -> true);
// return immediately without restoring.
when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex),
any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName,
storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
- storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
+ storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);
// verify that the store directory restore was not called (should have
restored from checkpoint dir)
verify(blobStoreUtil, times(0)).restoreDir(eq(storeDir.toFile()),
eq(dirIndex), any(Metadata.class), anyBoolean());
@@ -349,7 +349,7 @@ public class TestBlobStoreRestoreManager {
BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName,
storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
- storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
+ storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);
// verify that we checked the previously checkpointed SCMs.
verify(prevStoreSnapshotIndexes, times(1)).containsKey(eq("newStoreName"));
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
index 5b79315b6..a44f86e64 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
@@ -540,12 +540,12 @@ public class TestBlobStoreUtil {
// checksum should be ignored for sst file. Set any dummy value
FileIndex sstFileIndex = new FileIndex(sstFile.getFileName().toString(),
Collections.emptyList(), sstFileMetadata, 0L);
- assertTrue(DirDiffUtil.areSameFile(false).test(sstFile.toFile(),
sstFileIndex));
+ assertTrue(DirDiffUtil.areSameFile(false, true).test(sstFile.toFile(),
sstFileIndex));
// 2. test with sst file with different timestamps
// Update last modified time
Files.setLastModifiedTime(sstFile,
FileTime.fromMillis(System.currentTimeMillis() + 1000L));
- assertTrue(DirDiffUtil.areSameFile(false).test(sstFile.toFile(),
sstFileIndex));
+ assertTrue(DirDiffUtil.areSameFile(false, true).test(sstFile.toFile(),
sstFileIndex));
// 3. test with non-sst files with same metadata and content
Path tmpFile = Files.createTempFile("samza-testAreSameFiles-", ".tmp");
@@ -559,18 +559,18 @@ public class TestBlobStoreUtil {
FileIndex tmpFileIndex = new FileIndex(tmpFile.getFileName().toString(),
Collections.emptyList(), tmpFileMetadata,
FileUtils.checksumCRC32(tmpFile.toFile()));
- assertTrue(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(),
tmpFileIndex));
+ assertTrue(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(),
tmpFileIndex));
// 4. test with non-sst files with different attributes
// change lastModifiedTime of local file
FileTime prevLastModified = tmpFileAttribs.lastModifiedTime();
Files.setLastModifiedTime(tmpFile,
FileTime.fromMillis(System.currentTimeMillis() + 1000L));
- assertTrue(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(),
tmpFileIndex));
+ assertTrue(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(),
tmpFileIndex));
// change content/checksum of local file
Files.setLastModifiedTime(tmpFile, prevLastModified); // reset attributes
to match with remote file
fileUtil.writeToTextFile(tmpFile.toFile(), RandomStringUtils.random(1000),
false); //new content
- assertFalse(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(),
tmpFileIndex));
+ assertFalse(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(),
tmpFileIndex));
}
@Test
@@ -626,7 +626,7 @@ public class TestBlobStoreUtil {
blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata, false).join();
assertTrue(
- new DirDiffUtil().areSameDir(Collections.emptySet(),
false).test(restoreDirBasePath.toFile(), mockDirIndex));
+ new DirDiffUtil().areSameDir(Collections.emptySet(), false,
true).test(restoreDirBasePath.toFile(), mockDirIndex));
}
@Test
@@ -684,7 +684,7 @@ public class TestBlobStoreUtil {
blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata, false).join();
assertTrue(
- new DirDiffUtil().areSameDir(Collections.emptySet(),
false).test(restoreDirBasePath.toFile(), mockDirIndex));
+ new DirDiffUtil().areSameDir(Collections.emptySet(), false,
true).test(restoreDirBasePath.toFile(), mockDirIndex));
}
@Test
@@ -742,6 +742,66 @@ public class TestBlobStoreUtil {
}
}
+
+ @Test
+ public void testRestoreIgnoresDifferentFileOwnersOnConfig() throws
IOException {
+ Path restoreDirBasePath =
Files.createTempDirectory(BlobStoreTestUtil.TEMP_DIR_PREFIX);
+
+ // remote file == 26 blobs, blob ids from a to z, blob contents from a to
z, offsets 0 to 25.
+ DirIndex mockDirIndex = mock(DirIndex.class);
+ when(mockDirIndex.getDirName()).thenReturn(DirIndex.ROOT_DIR_NAME);
+ FileIndex mockFileIndex = mock(FileIndex.class);
+ when(mockFileIndex.getFileName()).thenReturn("1.sst");
+
+ // setup mock file attributes. create a temp file to get current
user/group/permissions so that they
+ // match with restored files.
+ File tmpFile = Paths.get(restoreDirBasePath.toString(), "tempfile-" + new
Random().nextInt()).toFile();
+ tmpFile.createNewFile();
+ PosixFileAttributes attrs = Files.readAttributes(tmpFile.toPath(),
PosixFileAttributes.class);
+ // create remote file with different owner than local file
+ FileMetadata fileMetadata = new FileMetadata(1234L, 1243L, 26, // ctime
mtime does not matter. size == 26
+ attrs.owner().getName() + "_different", attrs.group().getName(),
PosixFilePermissions.toString(attrs.permissions()));
+ when(mockFileIndex.getFileMetadata()).thenReturn(fileMetadata);
+ Files.delete(tmpFile.toPath()); // delete so that it doesn't show up in
restored dir contents.
+
+ List<FileBlob> mockFileBlobs = new ArrayList<>();
+ StringBuilder fileContents = new StringBuilder();
+ for (int i = 0; i < 26; i++) {
+ FileBlob mockFileBlob = mock(FileBlob.class);
+ char c = (char) ('a' + i);
+ fileContents.append(c); // blob contents == blobId
+ when(mockFileBlob.getBlobId()).thenReturn(String.valueOf(c));
+ when(mockFileBlob.getOffset()).thenReturn(i);
+ mockFileBlobs.add(mockFileBlob);
+ }
+ when(mockFileIndex.getBlobs()).thenReturn(mockFileBlobs);
+ CRC32 checksum = new CRC32();
+ checksum.update(fileContents.toString().getBytes());
+ when(mockFileIndex.getChecksum()).thenReturn(checksum.getValue());
+
when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex));
+
+ BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
+ when(mockBlobStoreManager.get(anyString(), any(OutputStream.class),
any(Metadata.class), any(Boolean.class))).thenAnswer(
+ (Answer<CompletionStage<Void>>) invocationOnMock -> {
+ String blobId = invocationOnMock.getArgumentAt(0, String.class);
+ OutputStream outputStream = invocationOnMock.getArgumentAt(1,
OutputStream.class);
+ // blob contents = blob id
+ outputStream.write(blobId.getBytes());
+
+ // force flush so that the checksum calculation later uses the full
file contents.
+ ((FileOutputStream) outputStream).getFD().sync();
+ return CompletableFuture.completedFuture(null);
+ });
+
+ BlobStoreConfig config = mock(BlobStoreConfig.class);
+ when(config.shouldCompareFileOwnersOnRestore()).thenReturn(false);
+ BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager,
EXECUTOR, blobStoreConfig, null, null);
+ blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex,
metadata, false).join();
+
+ assertTrue(
+ new DirDiffUtil().areSameDir(Collections.emptySet(), false,
config.shouldCompareFileOwnersOnRestore()).test(restoreDirBasePath.toFile(),
mockDirIndex));
+ }
+
@Test
@Ignore // TODO remove
public void testRestoreDirRecreatesEmptyFilesAndDirs() throws IOException {
@@ -758,7 +818,7 @@ public class TestBlobStoreUtil {
outputStream.write(blobId.getBytes());
return CompletableFuture.completedFuture(null);
});
- boolean result = new DirDiffUtil().areSameDir(new TreeSet<>(),
false).test(localSnapshot.toFile(), dirIndex);
+ boolean result = new DirDiffUtil().areSameDir(new TreeSet<>(), false,
true).test(localSnapshot.toFile(), dirIndex);
assertFalse(result);
//ToDo complete
}
@@ -788,7 +848,7 @@ public class TestBlobStoreUtil {
BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager,
EXECUTOR, blobStoreConfig, null, null);
blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), dirIndex, metadata,
false).join();
- assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(),
false).test(restoreDirBasePath.toFile(), dirIndex));
+ assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(), false,
true).test(restoreDirBasePath.toFile(), dirIndex));
}
/**
@@ -950,7 +1010,7 @@ public class TestBlobStoreUtil {
* This test verifies that a retriable exception is retried more than 3
times (default retry is limited to 3 attempts)
*/
@Test
- public void testPutFileRetriedMorethanThreeTimes() throws Exception {
+ public void testPutFileRetriedMoreThanThreeTimes() throws Exception {
SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId,
jobName, jobId, taskName, storeName);
Path path = Files.createTempFile("samza-testPutFileChecksum-", ".tmp");
FileUtil fileUtil = new FileUtil();
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
index 98f75122a..1bd2ee393 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -60,7 +60,7 @@ public class TestDirDiffUtilAreSameFile {
@Before
public void testSetup() throws Exception {
- areSameFile = DirDiffUtil.areSameFile(false);
+ areSameFile = DirDiffUtil.areSameFile(false, true);
createFile(SMALL_FILE);
}
@@ -129,6 +129,18 @@ public class TestDirDiffUtilAreSameFile {
Assert.assertFalse(areSameFile.test(localFile, remoteFile));
}
+ @Test
+ public void testAreSameFile_IgnoreDifferentOwner() {
+ areSameFile = DirDiffUtil.areSameFile(false, false);
+ remoteFileMetadata = new FileMetadata(0, 0,
+ localContentLength,
+ localFileAttrs.owner().getName() + "_different",
+ localFileAttrs.group().getName(),
+ PosixFilePermissions.toString(localFileAttrs.permissions()));
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum);
+ Assert.assertTrue(areSameFile.test(localFile, remoteFile));
+ }
+
@Test
public void testAreSameFile_DifferentGroup() {
remoteFileMetadata = new FileMetadata(0, 0,
@@ -197,7 +209,7 @@ public class TestDirDiffUtilAreSameFile {
createFile(LARGE_FILE);
for (int i = 0; i < 5; i++) {
- BiPredicate<File, FileIndex> areSameFile =
DirDiffUtil.areSameFile(false);
+ BiPredicate<File, FileIndex> areSameFile =
DirDiffUtil.areSameFile(false, true);
for (int j = 0; j < 20; j++) {
localFile = mock(File.class);
when(localFile.getName()).thenReturn("name");