This is an automated email from the ASF dual-hosted git repository.

thomasm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6c2e23dd98 OAK-12004 Datastore: speedup datastore copy (#2607)
6c2e23dd98 is described below

commit 6c2e23dd98baecd08920cd82d22d92f7feb4b66d
Author: Thomas Mueller <[email protected]>
AuthorDate: Thu Nov 6 16:39:02 2025 +0100

    OAK-12004 Datastore: speedup datastore copy (#2607)
    
    * OAK-12004 Datastore: speedup datastore copy
    
    * OAK-12004 Datastore: speedup datastore copy
    
    * OAK-12004 Datastore: speedup datastore copy
    
    * OAK-12004 Datastore: speedup datastore copy
---
 .../jackrabbit/oak/run/DataStoreCommand.java       |  34 +++-
 .../jackrabbit/oak/run/DataStoreCopyCommand.java   |  13 +-
 .../org/apache/jackrabbit/oak/run/Downloader.java  | 185 ++++++++++++++++++---
 .../jackrabbit/oak/run/DataStoreCommandTest.java   |  31 +++-
 4 files changed, 227 insertions(+), 36 deletions(-)

diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
index c9e84e29c7..c870ce5662 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
@@ -644,12 +644,8 @@ public class DataStoreCommand implements Command {
 
             String id = list.get(0);
             // Split 
b47b58169f121822cd4a0a0a153ba5910e581ad2bc450b6af7e51e6214c2b173#123311 on # to 
get the id
-            List<String> idLengthSepList = Arrays.stream(id.split(HASH))
-                    .map(String::trim)
-                    .filter(s -> !s.isEmpty())
-                    .collect(Collectors.toList());
-            String blobId = idLengthSepList.get(0);
-
+            String blobId = id.split(HASH)[0];
+            long length = getBlobLengthOrZero(id);
             if (dsType == FAKE || dsType == FDS) {
                 // 0102030405... => 01/02/03/0102030405...
                 blobId = String.join(System.getProperty("file.separator"), 
blobId.substring(0, 2), blobId.substring(2, 4),
@@ -665,7 +661,7 @@ public class DataStoreCommand implements Command {
             // In case of blob ids dump, the list size would be 1 (Consisting 
of just the id)
             if (list.size() > 1) {
                 // Join back the encoded blob ref and the path on which the 
ref is present
-                return String.join(DELIM, blobId, 
EscapeUtils.unescapeLineBreaks(list.get(1)));
+                return String.join(DELIM, blobId, 
EscapeUtils.unescapeLineBreaks(list.get(1)), "" + length);
             } else {
                 // return the encoded blob id
                 return blobId;
@@ -687,6 +683,30 @@ public class DataStoreCommand implements Command {
         }
     }
 
+    /**
+     * Try to read the blob length from the blobId. It is typically after the 
'#'.
+     * It never throws an exception.
+     *
+     * @param blobId the blob id, which may contain a '#'
+     * @return the length, or 0 if unknown.
+     */
+    public static long getBlobLengthOrZero(String blobId) {
+        if (blobId == null) {
+            return 0;
+        }
+        int hashIndex = blobId.indexOf('#');
+        if (hashIndex < 0) {
+            return 0;
+        }
+        String lengthString = blobId.substring(hashIndex + 1);
+        try {
+            return Long.parseLong(lengthString);
+        } catch (NumberFormatException e) {
+            log.warn("Can not parse length for blob id {}", blobId);
+            return 0;
+        }
+    }
+
     public static void main(String[] args) {
         long timestamp = System.currentTimeMillis();
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
index 7f54ad96a5..606371b4b5 100644
--- 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
+++ 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
@@ -76,13 +76,24 @@ public class DataStoreCopyCommand implements Command {
 
             long startNano = System.nanoTime();
 
-            ids.forEach(id -> {
+            ids.forEach(line -> {
+                String[] parts = line.split(",");
+                String id = parts[0];
+                long length = 0;
+                if (parts.length > 2) {
+                    try {
+                        length = Long.parseLong(parts[2]);
+                    } catch (NumberFormatException e) {
+                        // ignore: length 0
+                    }
+                }
                 Downloader.Item item = new Downloader.Item();
                 item.source = sourceRepo + "/" + id;
                 if (sasToken != null) {
                     item.source += "?" + sasToken;
                 }
                 item.destination = getDestinationFromId(id);
+                item.length = length;
                 item.checksum = id.replaceAll("-", "");
                 downloader.offer(item);
             });
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
index 70ce113d1a..fc2836c028 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
@@ -27,11 +27,14 @@ import java.io.Closeable;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLConnection;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.security.DigestOutputStream;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -54,7 +57,31 @@ public class Downloader implements Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(Downloader.class);
 
+    /**
+     * The maximum size of what is considered a "small file".
+     * At the same time, this is the block size for large files.
+     */
+    private static final long MAX_LENGTH_SINGLE_THREADED = 16 * 1024 * 1024;
+
+    /**
+     * The executor service used for small files,
+     * and to coordinate download of large files.
+     * Large files are split into parts, which are downloaded
+     * concurrently using range headers.
+     *
+     * The parts of large files may not use this service,
+     * otherwise download might deadlock: all threads
+     * might wait for parts, but the parts themselves
+     * can't be downloaded because the pool is full.
+     * The easiest solution is to use two pools.
+     */
     private final ExecutorService executorService;
+
+    /**
+     * The executor service used for parts of large files.
+     */
+    private final ExecutorService executorServiceForParts;
+
     private final int connectTimeoutMs;
     private final int readTimeoutMs;
     private final int slowLogThreshold;
@@ -80,7 +107,9 @@ public class Downloader implements Closeable {
         if (maxRetries <= 0 || maxRetries > 100) {
             throw new IllegalArgumentException("maxRetries range must be 
between 1 and 100");
         }
-        LOG.info("Initializing Downloader with max number of concurrent 
requests={}", concurrency);
+        // The constant 0.4 was found to give the best performance for a 
real-world scenario
+        int corePoolSize = (int) Math.ceil(concurrency * .4);
+        LOG.info("Initializing Downloader with max number of concurrent 
requests={}, core pool size {}", concurrency, corePoolSize);
         this.connectTimeoutMs = connectTimeoutMs;
         this.readTimeoutMs = readTimeoutMs;
         this.slowLogThreshold = slowLogThreshold;
@@ -100,17 +129,31 @@ public class Downloader implements Closeable {
         }
         this.bufferSize = bufferSize;
 
+        // The maximum number of threads in each executor service,
+        // when using a LinkedBlockingQueue(), is corePoolSize.
+        // all other tasks are kept in the LinkedBlockingQueue, which
+        // is unbounded.
+        // (Using a bounded queue, such as SynchronousQueue,
+        // would result in RejectedExecutionHandler).
+        // We want to keep things simple and don't want
+        // to use back presssure or other mechanisms.
+        // So in summary, corePoolSize threads are used, per service.
         this.executorService = new ThreadPoolExecutor(
-                (int) Math.ceil(concurrency * .1), concurrency, 60L, 
TimeUnit.SECONDS,
+                corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
                 new LinkedBlockingQueue<>(),
                 
BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build()
         );
+        this.executorServiceForParts = new ThreadPoolExecutor(
+                corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                
BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build()
+        );
         this.responses = new ArrayList<>();
     }
 
     public void offer(Item item) {
         responses.add(
-                this.executorService.submit(new RetryingCallable<>(new 
DownloaderWorker(item)))
+                this.executorService.submit(new RetryingCallable<>(new 
DownloaderWorker(executorServiceForParts, item)))
         );
     }
 
@@ -146,9 +189,11 @@ public class Downloader implements Closeable {
 
     private class DownloaderWorker implements Callable<ItemResponse> {
 
+        private final ExecutorService executorService;
         private final Item item;
 
-        public DownloaderWorker(Item item) {
+        public DownloaderWorker(ExecutorService executorService, Item item) {
+            this.executorService = executorService;
             this.item = item;
         }
 
@@ -170,29 +215,86 @@ public class Downloader implements Closeable {
             Path destinationPath = Paths.get(item.destination);
             Files.createDirectories(destinationPath.getParent());
 
+            long segmentSize = MAX_LENGTH_SINGLE_THREADED;
             long size = 0;
-            try (InputStream inputStream = sourceUrl.getInputStream();
-                 FileOutputStream outputStream = new 
FileOutputStream(destinationPath.toFile())) {
-                byte[] buffer = new byte[bufferSize];
-                int bytesRead;
-                while ((bytesRead = inputStream.read(buffer)) != -1) {
-                    if (md != null) {
-                        md.update(buffer, 0, bytesRead);
+            if (item.length >= segmentSize) {
+                size = item.length;
+                LOG.debug("Downloading large file {}: {} bytes", 
destinationPath.toString(), item.length);
+                String fileName = destinationPath.getFileName().toString();
+                long numSegments = (item.length + segmentSize - 1) / 
segmentSize;
+                ArrayList<Path> segmentFiles = new ArrayList<>();
+                ArrayList<Future<Boolean>> downloadTasks = new ArrayList<>();
+                for (int i = 0; i < numSegments; i++) {
+                    long startByte = i * segmentSize;
+                    long endByte = Math.min(startByte + segmentSize - 1, 
item.length - 1);
+                    Path segmentFile = 
destinationPath.getParent().resolve(fileName + "_" + i + ".tmp");
+                    segmentFiles.add(segmentFile);
+                    downloadTasks.add(executorService.submit(
+                        new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                Exception lastException = null;
+                                for (int i = 0; i < maxRetries; i++) {
+                                    try {
+                                        return tryDownloadRange(item.source, 
connectTimeoutMs, readTimeoutMs,
+                                                segmentFile, startByte, 
endByte);
+                                    } catch (Exception e) {
+                                        LOG.warn("Range download try # {} 
failed", i, e);
+                                        lastException = e;
+                                        // retry
+                                    }
+                                }
+                                throw lastException;
+                            }
+                        }
+                    ));
+                }
+                // wait for threads
+                boolean allSuccess = true;
+                for (int i = 0; i < downloadTasks.size(); i++) {
+                    try {
+                        boolean success = downloadTasks.get(i).get();
+                        if (!success) {
+                            allSuccess = false;
+                            break;
+                        }
+                    } catch (Exception e) {
+                        allSuccess = false;
+                        break;
+                    }
+                }
+                // merge
+                if (allSuccess) {
+                    try (OutputStream fileOut = 
Files.newOutputStream(destinationPath)) {
+                        OutputStream out = md == null ? fileOut : new 
DigestOutputStream(fileOut, md);
+                        for (Path segmentFile : segmentFiles) {
+                            if (Files.exists(segmentFile)) {
+                                Files.copy(segmentFile, out);
+                                Files.delete(segmentFile);
+                            }
+                        }
+                        LOG.debug("Downloaded {} size {}, {} parts", 
destinationPath.toString(), size, downloadTasks.size());
+                    }
+                } else {
+                    LOG.warn("Download {} failed", destinationPath.toString());
+                }
+            } else {
+                try (InputStream inputStream = sourceUrl.getInputStream();
+                     FileOutputStream out = new 
FileOutputStream(destinationPath.toFile())) {
+                    byte[] buffer = new byte[bufferSize];
+                    int bytesRead;
+                    while ((bytesRead = inputStream.read(buffer)) != -1) {
+                        if (md != null) {
+                            md.update(buffer, 0, bytesRead);
+                        }
+                        out.write(buffer, 0, bytesRead);
+                        size += bytesRead;
                     }
-                    outputStream.write(buffer, 0, bytesRead);
-                    size += bytesRead;
                 }
             }
 
             if (md != null) {
-                byte[] checksumBytes = md.digest();
-
-                // Convert the checksum bytes to a hexadecimal string
-                StringBuilder sb = new StringBuilder();
-                for (byte b : checksumBytes) {
-                    sb.append(String.format("%02x", b));
-                }
-                String checksum = sb.toString();
+                String checksum = getMessageDigestString(md);
                 // Warning: most modern checksum algorithms used for 
cryptographic purposes are designed to be case-insensitive,
                 // to ensure that the same checksum value is produced 
regardless of the input's case. There may be some
                 // legacy algorithms that are case-sensitive. Using 
equalsIgnoreCase can be considered safe here.
@@ -219,6 +321,45 @@ public class Downloader implements Closeable {
         }
     }
 
+    private static String getMessageDigestString(MessageDigest md) {
+        byte[] checksumBytes = md.digest();
+        // Convert the checksum bytes to a hexadecimal string
+        StringBuilder sb = new StringBuilder();
+        for (byte b : checksumBytes) {
+            sb.append(String.format("%02x", b));
+        }
+        return sb.toString();
+    }
+
+    private static boolean tryDownloadRange(String sourceURL, int 
connectTimeoutMs,
+            int readTimeoutMs, Path target, long startByte, long endByte) 
throws IOException {
+        HttpURLConnection connection = (HttpURLConnection) new 
URL(sourceURL).openConnection();
+        connection.setConnectTimeout(connectTimeoutMs);
+        connection.setReadTimeout(readTimeoutMs);
+        connection.setRequestProperty("Range", "bytes=" + startByte + "-" + 
endByte);
+        int responseCode = connection.getResponseCode();
+        if (responseCode != HttpURLConnection.HTTP_PARTIAL && responseCode != 
HttpURLConnection.HTTP_OK) {
+            throw new IOException("Unexpected response code: " + responseCode);
+        }
+        try (InputStream inputStream = connection.getInputStream();
+                OutputStream outputStream = Files.newOutputStream(target)) {
+            byte[] buffer = new byte[8192];
+            int bytesRead;
+            long totalBytesRead = 0;
+            long expectedBytes = endByte - startByte + 1;
+            while ((bytesRead = inputStream.read(buffer)) != -1) {
+                outputStream.write(buffer, 0, bytesRead);
+                totalBytesRead += bytesRead;
+                if (totalBytesRead >= expectedBytes) {
+                    break;
+                }
+            }
+            return true;
+        } finally {
+            connection.disconnect();
+        }
+    }
+
     private class RetryingCallable<V> implements Callable<V> {
         private final Callable<V> callable;
 
@@ -285,12 +426,14 @@ public class Downloader implements Closeable {
         public String source;
         public String destination;
         public String checksum;
+        public long length;
 
         @Override
         public String toString() {
             return "Item{" +
                     "source='" + source + '\'' +
                     ", destination='" + destination + '\'' +
+                    ", length=" + length +
                     (checksum != null ? ", checksum='" + checksum + '\'' : "") 
+
                     '}';
         }
diff --git 
a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java 
b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
index d97d629e19..5bdc3fa0b8 100644
--- 
a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
+++ 
b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
@@ -210,7 +211,7 @@ public class DataStoreCommandTest {
             while (idIter.hasNext()) {
                 String chunk = idIter.next();
                 data.added.add(chunk);
-                data.idToPath.put(chunk, (createMultiLevelNodes ? pathRoot : 
"") + "/c" + i);
+                data.idToPath.put(chunk, (createMultiLevelNodes ? pathRoot : 
"") + "/c" + i + ",18342");
                 if (!createMultiLevelNodes && toBeDeleted.contains(i)) {
                     data.deleted.add(chunk);
                 }
@@ -376,7 +377,7 @@ public class DataStoreCommandTest {
 
 
         for (String id : data.idToPath.keySet()) {
-            if (data.idToPath.get(id).equals("/c1") || 
data.idToPath.get(id).equals("/c2")) {
+            if (data.idToPath.get(id).equals("/c1,18342") || 
data.idToPath.get(id).equals("/c2,18342")) {
                 data.addedSubset.add(id);
             }
         }
@@ -392,7 +393,7 @@ public class DataStoreCommandTest {
 
 
         for (String id : data.idToPath.keySet()) {
-            if (data.idToPath.get(id).equals("/c1") || 
data.idToPath.get(id).equals("/c2")) {
+            if (data.idToPath.get(id).equals("/c1,18342") || 
data.idToPath.get(id).equals("/c2,18342")) {
                 data.addedSubset.add(id);
             }
         }
@@ -406,7 +407,7 @@ public class DataStoreCommandTest {
         storeFixture.close();
         additionalParams += " --verboseRootPath /c1,/c2";
         for (String id : data.idToPath.keySet()) {
-            if (data.idToPath.get(id).equals("/c1") || 
data.idToPath.get(id).equals("/c2")) {
+            if (data.idToPath.get(id).equals("/c1,18342") || 
data.idToPath.get(id).equals("/c2,18342")) {
                 data.addedSubset.add(id);
             }
         }
@@ -441,7 +442,7 @@ public class DataStoreCommandTest {
 
 
         for (String id : data.idToPath.keySet()) {
-            if (data.idToPath.get(id).equals("/c1") || 
data.idToPath.get(id).equals("/c2")) {
+            if (data.idToPath.get(id).equals("/c1,18342") || 
data.idToPath.get(id).equals("/c2,18342")) {
                 data.addedSubset.add(id);
             }
         }
@@ -547,6 +548,16 @@ public class DataStoreCommandTest {
         testConsistency(dump, data, false, false);
     }
 
+    @Test
+    public void getBlobLengthOrZero() {
+        assertEquals(1, DataStoreCommand.getBlobLengthOrZero("cafe#1"));
+        assertEquals(10, DataStoreCommand.getBlobLengthOrZero("cafe#10"));
+        assertEquals(0, DataStoreCommand.getBlobLengthOrZero("cafe"));
+        assertEquals(0, DataStoreCommand.getBlobLengthOrZero("#"));
+        assertEquals(0, DataStoreCommand.getBlobLengthOrZero(""));
+        assertEquals(0, DataStoreCommand.getBlobLengthOrZero(null));
+    }
+
     @Test
     public void testConsistencyMarkOnly() throws Exception {
         File dump = temporaryFolder.newFolder();
@@ -726,10 +737,16 @@ public class DataStoreCommandTest {
 
         if (verbose) {
             argsList.add("--verbose");
+        } else {
+            // only the verbose listing has the length
+            for (Entry<String, String> e : data.idToPath.entrySet()) {
+                data.idToPath.put(e.getKey(), e.getValue().split(",")[0]);
+            }
         }
+
         DataStoreCommand cmd = new DataStoreCommand();
         cmd.execute(argsList.toArray(new String[0]));
-        
+
         if (!markOnly) {
             assertFileEquals(dump, "avail-", SetUtils.difference(data.added, 
data.missingDataStore));
         } else {
@@ -743,7 +760,7 @@ public class DataStoreCommandTest {
                 (storeFixture instanceof StoreFixture.MongoStoreFixture) ?
                         encodedIdsAndPath(SetUtils.difference(data.added, 
data.deleted), blobFixture.getType(), data.idToPath, false) :
                         SetUtils.difference(data.added, data.deleted));
-        
+
         if (!markOnly) {
             // Verbose would have paths as well as ids changed but normally 
only DocumentNS would have paths suffixed
             assertFileEquals(dump, "gccand-", verbose ?

Reply via email to