This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d138757f7 Re-factor DirDiffUtil.getDirDiff to minimize calls to
sun.nio.fs.* (#1669)
d138757f7 is described below
commit d138757f7e163673fe9f80dd39131d4b1d3219f8
Author: Andy Sautins <[email protected]>
AuthorDate: Wed Jun 14 13:11:58 2023 -0600
Re-factor DirDiffUtil.getDirDiff to minimize calls to sun.nio.fs.* (#1669)
Re-factor DirDiffUtil.getDirDiff to minimize calls to sun.nio.fs.*
---
.../samza/storage/blobstore/util/DirDiffUtil.java | 158 +++++++----------
.../blobstore/util/TestDirDiffUtilAreSameFile.java | 186 +++++++++++++++++++++
2 files changed, 248 insertions(+), 96 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
index 4313d56b4..edae14645 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java
@@ -18,8 +18,10 @@
*/
package org.apache.samza.storage.blobstore.util;
-
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -37,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -57,7 +60,7 @@ import org.slf4j.LoggerFactory;
*/
public class DirDiffUtil {
private static final Logger LOG = LoggerFactory.getLogger(DirDiffUtil.class);
-
+ public static final int CACHE_SIZE = 10;
/**
* Checks if a local directory and a remote directory are identical. Local
and remote directories are identical iff:
* 1. The local directory has exactly the same set of files as the remote
directory, and the files are themselves
@@ -152,10 +155,25 @@ public class DirDiffUtil {
if (localFile.getName().equals(remoteFile.getFileName())) {
FileMetadata remoteFileMetadata = remoteFile.getFileMetadata();
- PosixFileAttributes localFileAttrs = null;
+ // Cache owner/group names to reduce calls to
sun.nio.fs.UnixFileAttributes.group
+ Cache<String, String> groupCache =
CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+ // Cache owner/group names to reduce calls to
sun.nio.fs.UnixFileAttributes.owner
+ Cache<String, String> ownerCache =
CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+
+ boolean areSameFiles;
+ PosixFileAttributes localFileAttrs;
try {
localFileAttrs = Files.readAttributes(localFile.toPath(),
PosixFileAttributes.class);
- } catch (IOException e) {
+
+ // Don't compare file timestamps. The ctime of a local file just
restored will be different than the
+ // remote file, and will cause the file to be uploaded again during
the first commit after restore.
+ areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize()
&&
+
groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(),
"unix:gid")),
+ () ->
localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) &&
+
ownerCache.get(String.valueOf(Files.getAttribute(localFile.toPath(),
"unix:uid")),
+ () ->
localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
+
+ } catch (IOException | ExecutionException e) {
LOG.error("Error reading attributes for file: {}",
localFile.getAbsolutePath());
throw new RuntimeException(String.format("Error reading attributes
for file: %s", localFile.getAbsolutePath()));
}
@@ -163,13 +181,6 @@ public class DirDiffUtil {
Set<PosixFilePermission> remoteFilePermissions =
PosixFilePermissions.fromString(remoteFileMetadata.getPermissions());
- // Don't compare file timestamps. The ctime of a local file just
restored will be different than the
- // remote file, and will cause the file to be uploaded again during
the first commit after restore.
- boolean areSameFiles =
- localFileAttrs.size() == remoteFileMetadata.getSize() &&
-
localFileAttrs.group().getName().equals(remoteFileMetadata.getGroup()) &&
-
localFileAttrs.owner().getName().equals(remoteFileMetadata.getOwner());
-
// only verify permissions for file owner
areSameFiles = areSameFiles &&
localFileAttrs.permissions().contains(PosixFilePermission.OWNER_READ) ==
@@ -180,16 +191,18 @@ public class DirDiffUtil {
remoteFilePermissions.contains(PosixFilePermission.OWNER_EXECUTE);
if (!areSameFiles) {
- LOG.warn("Local file: {} and remote file: {} are not same. " +
- "Local file attributes: {}. Remote file attributes: {}.",
- localFile.getAbsolutePath(), remoteFile.getFileName(),
- fileAttributesToString(localFileAttrs),
remoteFile.getFileMetadata().toString());
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Local file: {} and remote file: {} are not same. "
+ + "Local file attributes: {}. Remote file attributes:
{}.", localFile.getAbsolutePath(), remoteFile.getFileName(),
+ fileAttributesToString(localFileAttrs),
remoteFile.getFileMetadata().toString());
+ }
return false;
} else {
- LOG.trace("Local file: {}. Remote file: {}. " +
- "Local file attributes: {}. Remote file attributes: {}.",
- localFile.getAbsolutePath(), remoteFile.getFileName(),
- fileAttributesToString(localFileAttrs),
remoteFile.getFileMetadata().toString());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Local file: {}. Remote file: {}. " + "Local file
attributes: {}. Remote file attributes: {}.",
+ localFile.getAbsolutePath(), remoteFile.getFileName(),
fileAttributesToString(localFileAttrs),
+ remoteFile.getFileMetadata().toString());
+ }
}
boolean isLargeFile = localFileAttrs.size() > 1024 * 1024;
@@ -252,6 +265,9 @@ public class DirDiffUtil {
List<DirDiff> subDirsAdded = new ArrayList<>();
List<DirDiff> subDirsRetained = new ArrayList<>();
List<DirIndex> subDirsRemoved = new ArrayList<>();
+ List<File> filesToUpload = new ArrayList<>();
+ List<FileIndex> filesToRetain = new ArrayList<>();
+ List<FileIndex> filesToRemove = new ArrayList<>();
// list files returns empty list if local snapshot directory is empty
List<File> localSnapshotFiles =
Arrays.asList(Objects.requireNonNull(localSnapshotDir.listFiles(File::isFile)));
@@ -268,10 +284,32 @@ public class DirDiffUtil {
.map(DirIndex::getDirName)
.collect(Collectors.toCollection(HashSet::new));
- // TODO MED shesharm: this compares each file in directory 3 times.
Categorize files in one traversal instead.
- List<File> filesToUpload = getNewFilesToUpload(remoteSnapshotFiles,
localSnapshotFiles, areSameFile);
- List<FileIndex> filesToRetain = getFilesToRetain(remoteSnapshotFiles,
localSnapshotFiles, areSameFile);
- List<FileIndex> filesToRemove = getFilesToRemove(remoteSnapshotFiles,
localSnapshotFiles, areSameFile);
+ Map<String, FileIndex> remoteFiles = remoteSnapshotFiles.stream()
+ .collect(Collectors.toMap(FileIndex::getFileName,
Function.identity()));
+
+ Map<String, File> localFiles = localSnapshotFiles.stream()
+ .collect(Collectors.toMap(File::getName, Function.identity()));
+
+ for (String file : Sets.union(remoteFiles.keySet(), localFiles.keySet())) {
+ if (localFiles.containsKey(file)) {
+ if (remoteFiles.containsKey(file)) {
+ if (areSameFile.test(localFiles.get(file), remoteFiles.get(file))) {
+ // Files are the same locally and remotely, Retain
+ filesToRetain.add(remoteFiles.get(file));
+ } else {
+ // Files are not the same, remove and upload
+ filesToRemove.add(remoteFiles.get(file));
+ filesToUpload.add(localFiles.get(file));
+ }
+ } else {
+ // File exists locally, but not remotely, Upload
+ filesToUpload.add(localFiles.get(file));
+ }
+ } else if (remoteFiles.containsKey(file)) {
+ // File exists remotely, but not locally, Remove
+ filesToRemove.add(remoteFiles.get(file));
+ }
+ }
for (File localSnapshotSubDir: localSnapshotSubDirs) {
if (!remoteSnapshotSubDirNames.contains(localSnapshotSubDir.getName())) {
@@ -329,78 +367,6 @@ public class DirDiffUtil {
subDirsAdded, Collections.emptyList(), Collections.emptyList());
}
- /**
- * Returns a list of files uploaded in remote checkpoint that are not
present in new local snapshot and needs to be
- * deleted/reclaimed from remote store.
- */
- private static List<FileIndex> getFilesToRemove(
- List<FileIndex> remoteSnapshotFiles, List<File> localSnapshotFiles,
- BiPredicate<File, FileIndex> areSameFile) {
- List<FileIndex> filesToRemove = new ArrayList<>();
-
- Map<String, File> localFiles = localSnapshotFiles.stream()
- .collect(Collectors.toMap(File::getName, Function.identity()));
-
- for (FileIndex remoteFile : remoteSnapshotFiles) {
- String remoteFileName = remoteFile.getFileName();
- if (!localFiles.containsKey(remoteFileName) ||
- !areSameFile.test(localFiles.get(remoteFileName), remoteFile)) {
- LOG.debug("File {} only present in remote snapshot or is not the same
as local file.", remoteFile.getFileName());
- filesToRemove.add(remoteFile);
- }
- }
-
- return filesToRemove;
- }
-
- /**
- * Returns a list of files to be uploaded to remote store that are part of
new snapshot created locally.
- */
- private static List<File> getNewFilesToUpload(
- List<FileIndex> remoteSnapshotFiles, List<File> localSnapshotFiles,
- BiPredicate<File, FileIndex> areSameFile) {
- List<File> filesToUpload = new ArrayList<>();
-
- Map<String, FileIndex> remoteFiles = remoteSnapshotFiles.stream()
- .collect(Collectors.toMap(FileIndex::getFileName,
Function.identity()));
-
- for (File localFile: localSnapshotFiles) {
- String localFileName = localFile.getName();
- if (!remoteFiles.containsKey(localFileName) ||
- !areSameFile.test(localFile, remoteFiles.get(localFileName))) {
- LOG.debug("File {} only present in local snapshot or is not the same
as remote file.", localFile.getPath());
- filesToUpload.add(localFile);
- }
- }
-
- return filesToUpload;
- }
-
- /**
- * Returns a list of common files between local and remote snapshot. These
files are reused from prev remote snapshot
- * and do not need to be uploaded again.
- */
- private static List<FileIndex> getFilesToRetain(
- List<FileIndex> remoteSnapshotFiles, List<File> localSnapshotFiles,
- BiPredicate<File, FileIndex> areSameFile) {
- List<FileIndex> filesToRetain = new ArrayList<>();
-
- Map<String, File> localFiles = localSnapshotFiles.stream()
- .collect(Collectors.toMap(File::getName, Function.identity()));
-
- for (FileIndex remoteFile : remoteSnapshotFiles) {
- String remoteFileName = remoteFile.getFileName();
- if (localFiles.containsKey(remoteFileName) &&
- areSameFile.test(localFiles.get(remoteFileName), remoteFile)) {
- String localFilePath = localFiles.get(remoteFileName).getPath();
- LOG.debug("File {} present in both local and remote snapshot and is
the same.", localFilePath);
- filesToRetain.add(remoteFile);
- }
- }
-
- return filesToRetain;
- }
-
private static String fileAttributesToString(PosixFileAttributes
fileAttributes) {
return "PosixFileAttributes{" +
"creationTimeMillis=" + fileAttributes.creationTime().toMillis() +
@@ -411,4 +377,4 @@ public class DirDiffUtil {
", permissions=" +
PosixFilePermissions.toString(fileAttributes.permissions()) +
'}';
}
-}
\ No newline at end of file
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
new file mode 100644
index 000000000..15712f932
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFileAttributes;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.zip.CRC32;
+import junit.framework.Assert;
+import org.apache.samza.storage.blobstore.index.FileIndex;
+import org.apache.samza.storage.blobstore.index.FileMetadata;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDirDiffUtilAreSameFile {
+ public static final int SMALL_FILE = 100;
+ public static final int LARGE_FILE = 1024 * 1024 + 1;
+ private BiPredicate<File, FileIndex> areSameFile = null;
+
+ File localFile = null;
+
+ FileIndex remoteFile = null;
+ private long localChecksum = 0;
+ private PosixFileAttributes localFileAttrs = null;
+ private FileMetadata remoteFileMetadata = null;
+ private long localContentLength = 0;
+
+ @Before
+ public void testSetup() throws Exception {
+ areSameFile = DirDiffUtil.areSameFile(false);
+ createFile(SMALL_FILE);
+ }
+
+ void createFile(int fileSize) throws Exception {
+ localFile = File.createTempFile("temp", null);
+ final String data = "a";
+ CRC32 crc32 = new CRC32();
+ try (FileWriter writer = new FileWriter(localFile);
+ BufferedWriter bw = new BufferedWriter(writer)) {
+ for (int i = 0; i < fileSize; i++) {
+ crc32.update(data.getBytes(StandardCharsets.UTF_8));
+ bw.write(data);
+ }
+ }
+ localContentLength = localFile.length();
+ localChecksum = crc32.getValue();
+
+ localFileAttrs = Files.readAttributes(localFile.toPath(),
PosixFileAttributes.class);
+
+ remoteFileMetadata = new FileMetadata(0, 0,
+ localContentLength,
+ localFileAttrs.owner().getName(),
+ localFileAttrs.group().getName(),
+ PosixFilePermissions.toString(localFileAttrs.permissions()));
+
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum);
+ localFile.deleteOnExit();
+ }
+
+ @Test
+ public void testAreSameFile_SameFile() {
+ Assert.assertTrue(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_DifferentFile() {
+ remoteFile = new FileIndex(localFile.getName() + "_other", new
ArrayList<>(), remoteFileMetadata, localChecksum + 1);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_DifferentCrc() {
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum + 1);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_DifferentSize() {
+ remoteFileMetadata = new FileMetadata(0, 0,
+ localContentLength + 1,
+ localFileAttrs.owner().getName(),
+ localFileAttrs.group().getName(),
+ PosixFilePermissions.toString(localFileAttrs.permissions()));
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_DifferentOwner() {
+ remoteFileMetadata = new FileMetadata(0, 0,
+ localContentLength,
+ localFileAttrs.owner().getName() + "_different",
+ localFileAttrs.group().getName(),
+ PosixFilePermissions.toString(localFileAttrs.permissions()));
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_DifferentGroup() {
+ remoteFileMetadata = new FileMetadata(0, 0,
+ localContentLength,
+ localFileAttrs.owner().getName(),
+ localFileAttrs.group().getName() + "_different",
+ PosixFilePermissions.toString(localFileAttrs.permissions()));
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_DifferentOwnerRead() {
+ Set<PosixFilePermission> remoteFilePermissions =
localFileAttrs.permissions();
+ remoteFilePermissions.remove(PosixFilePermission.OWNER_READ);
+ remoteFileMetadata = new FileMetadata(0, 0,
+ localContentLength,
+ localFileAttrs.owner().getName(),
+ localFileAttrs.group().getName(),
+ PosixFilePermissions.toString(remoteFilePermissions));
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_DifferentOwnerWrite() {
+ Set<PosixFilePermission> remoteFilePermissions =
localFileAttrs.permissions();
+ remoteFilePermissions.remove(PosixFilePermission.OWNER_WRITE);
+ remoteFileMetadata = new FileMetadata(0, 0,
+ localContentLength,
+ localFileAttrs.owner().getName(),
+ localFileAttrs.group().getName(),
+ PosixFilePermissions.toString(remoteFilePermissions));
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_DifferentOwnerExecute() {
+ Set<PosixFilePermission> remoteFilePermissions =
localFileAttrs.permissions();
+ remoteFilePermissions.add(PosixFilePermission.OWNER_EXECUTE);
+ remoteFileMetadata = new FileMetadata(0, 0,
+ localContentLength,
+ localFileAttrs.owner().getName(),
+ localFileAttrs.group().getName(),
+ PosixFilePermissions.toString(remoteFilePermissions));
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_SmallFile_DifferentCrc() {
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum + 1);
+ Assert.assertFalse(areSameFile.test(localFile, remoteFile));
+ }
+
+ @Test
+ public void testAreSameFile_LargeFile_DifferentCrc() throws Exception {
+ createFile(LARGE_FILE);
+ remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(),
remoteFileMetadata, localChecksum + 1);
+ Assert.assertTrue(areSameFile.test(localFile, remoteFile));
+ }
+}