This is an automated email from the ASF dual-hosted git repository.
thomasm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6c2e23dd98 OAK-12004 Datastore: speedup datastore copy (#2607)
6c2e23dd98 is described below
commit 6c2e23dd98baecd08920cd82d22d92f7feb4b66d
Author: Thomas Mueller <[email protected]>
AuthorDate: Thu Nov 6 16:39:02 2025 +0100
OAK-12004 Datastore: speedup datastore copy (#2607)
* OAK-12004 Datastore: speedup datastore copy
* OAK-12004 Datastore: speedup datastore copy
* OAK-12004 Datastore: speedup datastore copy
* OAK-12004 Datastore: speedup datastore copy
---
.../jackrabbit/oak/run/DataStoreCommand.java | 34 +++-
.../jackrabbit/oak/run/DataStoreCopyCommand.java | 13 +-
.../org/apache/jackrabbit/oak/run/Downloader.java | 185 ++++++++++++++++++---
.../jackrabbit/oak/run/DataStoreCommandTest.java | 31 +++-
4 files changed, 227 insertions(+), 36 deletions(-)
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
index c9e84e29c7..c870ce5662 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
@@ -644,12 +644,8 @@ public class DataStoreCommand implements Command {
String id = list.get(0);
// Split
b47b58169f121822cd4a0a0a153ba5910e581ad2bc450b6af7e51e6214c2b173#123311 on # to
get the id
- List<String> idLengthSepList = Arrays.stream(id.split(HASH))
- .map(String::trim)
- .filter(s -> !s.isEmpty())
- .collect(Collectors.toList());
- String blobId = idLengthSepList.get(0);
-
+ String blobId = id.split(HASH)[0];
+ long length = getBlobLengthOrZero(id);
if (dsType == FAKE || dsType == FDS) {
// 0102030405... => 01/02/03/0102030405...
blobId = String.join(System.getProperty("file.separator"),
blobId.substring(0, 2), blobId.substring(2, 4),
@@ -665,7 +661,7 @@ public class DataStoreCommand implements Command {
// In case of blob ids dump, the list size would be 1 (Consisting
of just the id)
if (list.size() > 1) {
// Join back the encoded blob ref and the path on which the
ref is present
- return String.join(DELIM, blobId,
EscapeUtils.unescapeLineBreaks(list.get(1)));
+ return String.join(DELIM, blobId,
EscapeUtils.unescapeLineBreaks(list.get(1)), "" + length);
} else {
// return the encoded blob id
return blobId;
@@ -687,6 +683,30 @@ public class DataStoreCommand implements Command {
}
}
+ /**
+ * Try to read the blob length from the blobId. It is typically after the
'#'.
+ * It never throws an exception.
+ *
+ * @param blobId the blob id, which may contain a '#'
+ * @return the length, or 0 if unknown.
+ */
+ public static long getBlobLengthOrZero(String blobId) {
+ if (blobId == null) {
+ return 0;
+ }
+ int hashIndex = blobId.indexOf('#');
+ if (hashIndex < 0) {
+ return 0;
+ }
+ String lengthString = blobId.substring(hashIndex + 1);
+ try {
+ return Long.parseLong(lengthString);
+ } catch (NumberFormatException e) {
+ log.warn("Can not parse length for blob id {}", blobId);
+ return 0;
+ }
+ }
+
public static void main(String[] args) {
long timestamp = System.currentTimeMillis();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
index 7f54ad96a5..606371b4b5 100644
---
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
+++
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
@@ -76,13 +76,24 @@ public class DataStoreCopyCommand implements Command {
long startNano = System.nanoTime();
- ids.forEach(id -> {
+ ids.forEach(line -> {
+ String[] parts = line.split(",");
+ String id = parts[0];
+ long length = 0;
+ if (parts.length > 2) {
+ try {
+ length = Long.parseLong(parts[2]);
+ } catch (NumberFormatException e) {
+ // ignore: length 0
+ }
+ }
Downloader.Item item = new Downloader.Item();
item.source = sourceRepo + "/" + id;
if (sasToken != null) {
item.source += "?" + sasToken;
}
item.destination = getDestinationFromId(id);
+ item.length = length;
item.checksum = id.replaceAll("-", "");
downloader.offer(item);
});
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
index 70ce113d1a..fc2836c028 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
@@ -27,11 +27,14 @@ import java.io.Closeable;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -54,7 +57,31 @@ public class Downloader implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(Downloader.class);
+ /**
+ * The maximum size of what is considered a "small file".
+ * At the same time, this is the block size for large files.
+ */
+ private static final long MAX_LENGTH_SINGLE_THREADED = 16 * 1024 * 1024;
+
+ /**
+ * The executor service used for small files,
+ * and to coordinate download of large files.
+ * Large files are split into parts, which are downloaded
+ * concurrently using range headers.
+ *
+ * The parts of large files may not use this service,
+ * otherwise download might deadlock: all threads
+ * might wait for parts, but the parts themselves
+ * can't be downloaded because the pool is full.
+ * The easiest solution is to use two pools.
+ */
private final ExecutorService executorService;
+
+ /**
+ * The executor service used for parts of large files.
+ */
+ private final ExecutorService executorServiceForParts;
+
private final int connectTimeoutMs;
private final int readTimeoutMs;
private final int slowLogThreshold;
@@ -80,7 +107,9 @@ public class Downloader implements Closeable {
if (maxRetries <= 0 || maxRetries > 100) {
throw new IllegalArgumentException("maxRetries range must be
between 1 and 100");
}
- LOG.info("Initializing Downloader with max number of concurrent
requests={}", concurrency);
+ // The constant 0.4 was found to give the best performance for a
real-world scenario
+ int corePoolSize = (int) Math.ceil(concurrency * .4);
+ LOG.info("Initializing Downloader with max number of concurrent
requests={}, core pool size {}", concurrency, corePoolSize);
this.connectTimeoutMs = connectTimeoutMs;
this.readTimeoutMs = readTimeoutMs;
this.slowLogThreshold = slowLogThreshold;
@@ -100,17 +129,31 @@ public class Downloader implements Closeable {
}
this.bufferSize = bufferSize;
+ // The maximum number of threads in each executor service,
+ // when using a LinkedBlockingQueue(), is corePoolSize.
+ // all other tasks are kept in the LinkedBlockingQueue, which
+ // is unbounded.
+ // (Using a bounded queue, such as SynchronousQueue,
+ // would result in RejectedExecutionHandler).
+ // We want to keep things simple and don't want
+ // to use back presssure or other mechanisms.
+ // So in summary, corePoolSize threads are used, per service.
this.executorService = new ThreadPoolExecutor(
- (int) Math.ceil(concurrency * .1), concurrency, 60L,
TimeUnit.SECONDS,
+ corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build()
);
+ this.executorServiceForParts = new ThreadPoolExecutor(
+ corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+
BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build()
+ );
this.responses = new ArrayList<>();
}
public void offer(Item item) {
responses.add(
- this.executorService.submit(new RetryingCallable<>(new
DownloaderWorker(item)))
+ this.executorService.submit(new RetryingCallable<>(new
DownloaderWorker(executorServiceForParts, item)))
);
}
@@ -146,9 +189,11 @@ public class Downloader implements Closeable {
private class DownloaderWorker implements Callable<ItemResponse> {
+ private final ExecutorService executorService;
private final Item item;
- public DownloaderWorker(Item item) {
+ public DownloaderWorker(ExecutorService executorService, Item item) {
+ this.executorService = executorService;
this.item = item;
}
@@ -170,29 +215,86 @@ public class Downloader implements Closeable {
Path destinationPath = Paths.get(item.destination);
Files.createDirectories(destinationPath.getParent());
+ long segmentSize = MAX_LENGTH_SINGLE_THREADED;
long size = 0;
- try (InputStream inputStream = sourceUrl.getInputStream();
- FileOutputStream outputStream = new
FileOutputStream(destinationPath.toFile())) {
- byte[] buffer = new byte[bufferSize];
- int bytesRead;
- while ((bytesRead = inputStream.read(buffer)) != -1) {
- if (md != null) {
- md.update(buffer, 0, bytesRead);
+ if (item.length >= segmentSize) {
+ size = item.length;
+ LOG.debug("Downloading large file {}: {} bytes",
destinationPath.toString(), item.length);
+ String fileName = destinationPath.getFileName().toString();
+ long numSegments = (item.length + segmentSize - 1) /
segmentSize;
+ ArrayList<Path> segmentFiles = new ArrayList<>();
+ ArrayList<Future<Boolean>> downloadTasks = new ArrayList<>();
+ for (int i = 0; i < numSegments; i++) {
+ long startByte = i * segmentSize;
+ long endByte = Math.min(startByte + segmentSize - 1,
item.length - 1);
+ Path segmentFile =
destinationPath.getParent().resolve(fileName + "_" + i + ".tmp");
+ segmentFiles.add(segmentFile);
+ downloadTasks.add(executorService.submit(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ Exception lastException = null;
+ for (int i = 0; i < maxRetries; i++) {
+ try {
+ return tryDownloadRange(item.source,
connectTimeoutMs, readTimeoutMs,
+ segmentFile, startByte,
endByte);
+ } catch (Exception e) {
+ LOG.warn("Range download try # {}
failed", i, e);
+ lastException = e;
+ // retry
+ }
+ }
+ throw lastException;
+ }
+ }
+ ));
+ }
+ // wait for threads
+ boolean allSuccess = true;
+ for (int i = 0; i < downloadTasks.size(); i++) {
+ try {
+ boolean success = downloadTasks.get(i).get();
+ if (!success) {
+ allSuccess = false;
+ break;
+ }
+ } catch (Exception e) {
+ allSuccess = false;
+ break;
+ }
+ }
+ // merge
+ if (allSuccess) {
+ try (OutputStream fileOut =
Files.newOutputStream(destinationPath)) {
+ OutputStream out = md == null ? fileOut : new
DigestOutputStream(fileOut, md);
+ for (Path segmentFile : segmentFiles) {
+ if (Files.exists(segmentFile)) {
+ Files.copy(segmentFile, out);
+ Files.delete(segmentFile);
+ }
+ }
+ LOG.debug("Downloaded {} size {}, {} parts",
destinationPath.toString(), size, downloadTasks.size());
+ }
+ } else {
+ LOG.warn("Download {} failed", destinationPath.toString());
+ }
+ } else {
+ try (InputStream inputStream = sourceUrl.getInputStream();
+ FileOutputStream out = new
FileOutputStream(destinationPath.toFile())) {
+ byte[] buffer = new byte[bufferSize];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ if (md != null) {
+ md.update(buffer, 0, bytesRead);
+ }
+ out.write(buffer, 0, bytesRead);
+ size += bytesRead;
}
- outputStream.write(buffer, 0, bytesRead);
- size += bytesRead;
}
}
if (md != null) {
- byte[] checksumBytes = md.digest();
-
- // Convert the checksum bytes to a hexadecimal string
- StringBuilder sb = new StringBuilder();
- for (byte b : checksumBytes) {
- sb.append(String.format("%02x", b));
- }
- String checksum = sb.toString();
+ String checksum = getMessageDigestString(md);
// Warning: most modern checksum algorithms used for
cryptographic purposes are designed to be case-insensitive,
// to ensure that the same checksum value is produced
regardless of the input's case. There may be some
// legacy algorithms that are case-sensitive. Using
equalsIgnoreCase can be considered safe here.
@@ -219,6 +321,45 @@ public class Downloader implements Closeable {
}
}
+ private static String getMessageDigestString(MessageDigest md) {
+ byte[] checksumBytes = md.digest();
+ // Convert the checksum bytes to a hexadecimal string
+ StringBuilder sb = new StringBuilder();
+ for (byte b : checksumBytes) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ }
+
+ private static boolean tryDownloadRange(String sourceURL, int
connectTimeoutMs,
+ int readTimeoutMs, Path target, long startByte, long endByte)
throws IOException {
+ HttpURLConnection connection = (HttpURLConnection) new
URL(sourceURL).openConnection();
+ connection.setConnectTimeout(connectTimeoutMs);
+ connection.setReadTimeout(readTimeoutMs);
+ connection.setRequestProperty("Range", "bytes=" + startByte + "-" +
endByte);
+ int responseCode = connection.getResponseCode();
+ if (responseCode != HttpURLConnection.HTTP_PARTIAL && responseCode !=
HttpURLConnection.HTTP_OK) {
+ throw new IOException("Unexpected response code: " + responseCode);
+ }
+ try (InputStream inputStream = connection.getInputStream();
+ OutputStream outputStream = Files.newOutputStream(target)) {
+ byte[] buffer = new byte[8192];
+ int bytesRead;
+ long totalBytesRead = 0;
+ long expectedBytes = endByte - startByte + 1;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ totalBytesRead += bytesRead;
+ if (totalBytesRead >= expectedBytes) {
+ break;
+ }
+ }
+ return true;
+ } finally {
+ connection.disconnect();
+ }
+ }
+
private class RetryingCallable<V> implements Callable<V> {
private final Callable<V> callable;
@@ -285,12 +426,14 @@ public class Downloader implements Closeable {
public String source;
public String destination;
public String checksum;
+ public long length;
@Override
public String toString() {
return "Item{" +
"source='" + source + '\'' +
", destination='" + destination + '\'' +
+ ", length=" + length +
(checksum != null ? ", checksum='" + checksum + '\'' : "")
+
'}';
}
diff --git
a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
index d97d629e19..5bdc3fa0b8 100644
---
a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
+++
b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
@@ -210,7 +211,7 @@ public class DataStoreCommandTest {
while (idIter.hasNext()) {
String chunk = idIter.next();
data.added.add(chunk);
- data.idToPath.put(chunk, (createMultiLevelNodes ? pathRoot :
"") + "/c" + i);
+ data.idToPath.put(chunk, (createMultiLevelNodes ? pathRoot :
"") + "/c" + i + ",18342");
if (!createMultiLevelNodes && toBeDeleted.contains(i)) {
data.deleted.add(chunk);
}
@@ -376,7 +377,7 @@ public class DataStoreCommandTest {
for (String id : data.idToPath.keySet()) {
- if (data.idToPath.get(id).equals("/c1") ||
data.idToPath.get(id).equals("/c2")) {
+ if (data.idToPath.get(id).equals("/c1,18342") ||
data.idToPath.get(id).equals("/c2,18342")) {
data.addedSubset.add(id);
}
}
@@ -392,7 +393,7 @@ public class DataStoreCommandTest {
for (String id : data.idToPath.keySet()) {
- if (data.idToPath.get(id).equals("/c1") ||
data.idToPath.get(id).equals("/c2")) {
+ if (data.idToPath.get(id).equals("/c1,18342") ||
data.idToPath.get(id).equals("/c2,18342")) {
data.addedSubset.add(id);
}
}
@@ -406,7 +407,7 @@ public class DataStoreCommandTest {
storeFixture.close();
additionalParams += " --verboseRootPath /c1,/c2";
for (String id : data.idToPath.keySet()) {
- if (data.idToPath.get(id).equals("/c1") ||
data.idToPath.get(id).equals("/c2")) {
+ if (data.idToPath.get(id).equals("/c1,18342") ||
data.idToPath.get(id).equals("/c2,18342")) {
data.addedSubset.add(id);
}
}
@@ -441,7 +442,7 @@ public class DataStoreCommandTest {
for (String id : data.idToPath.keySet()) {
- if (data.idToPath.get(id).equals("/c1") ||
data.idToPath.get(id).equals("/c2")) {
+ if (data.idToPath.get(id).equals("/c1,18342") ||
data.idToPath.get(id).equals("/c2,18342")) {
data.addedSubset.add(id);
}
}
@@ -547,6 +548,16 @@ public class DataStoreCommandTest {
testConsistency(dump, data, false, false);
}
+ @Test
+ public void getBlobLengthOrZero() {
+ assertEquals(1, DataStoreCommand.getBlobLengthOrZero("cafe#1"));
+ assertEquals(10, DataStoreCommand.getBlobLengthOrZero("cafe#10"));
+ assertEquals(0, DataStoreCommand.getBlobLengthOrZero("cafe"));
+ assertEquals(0, DataStoreCommand.getBlobLengthOrZero("#"));
+ assertEquals(0, DataStoreCommand.getBlobLengthOrZero(""));
+ assertEquals(0, DataStoreCommand.getBlobLengthOrZero(null));
+ }
+
@Test
public void testConsistencyMarkOnly() throws Exception {
File dump = temporaryFolder.newFolder();
@@ -726,10 +737,16 @@ public class DataStoreCommandTest {
if (verbose) {
argsList.add("--verbose");
+ } else {
+ // only the verbose listing has the length
+ for (Entry<String, String> e : data.idToPath.entrySet()) {
+ data.idToPath.put(e.getKey(), e.getValue().split(",")[0]);
+ }
}
+
DataStoreCommand cmd = new DataStoreCommand();
cmd.execute(argsList.toArray(new String[0]));
-
+
if (!markOnly) {
assertFileEquals(dump, "avail-", SetUtils.difference(data.added,
data.missingDataStore));
} else {
@@ -743,7 +760,7 @@ public class DataStoreCommandTest {
(storeFixture instanceof StoreFixture.MongoStoreFixture) ?
encodedIdsAndPath(SetUtils.difference(data.added,
data.deleted), blobFixture.getType(), data.idToPath, false) :
SetUtils.difference(data.added, data.deleted));
-
+
if (!markOnly) {
// Verbose would have paths as well as ids changed but normally
only DocumentNS would have paths suffixed
assertFileEquals(dump, "gccand-", verbose ?