thomasmueller commented on code in PR #1156: URL: https://github.com/apache/jackrabbit-oak/pull/1156#discussion_r1369744991
########## oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java: ########## @@ -19,103 +19,134 @@ package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined; import org.apache.jackrabbit.oak.commons.Compression; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.BufferedWriter; -import java.io.File; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; -import java.util.Set; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET; import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class PipelinedMergeSortTaskTest { - private static final Logger LOG = LoggerFactory.getLogger(PipelinedMergeSortTaskTest.class); +public class PipelinedMergeSortTaskTest extends PipelinedMergeSortTaskTestBase { private final ClassLoader classLoader = getClass().getClassLoader(); - private final PathElementComparator pathComparator = new PathElementComparator(Set.of()); private final Compression algorithm = Compression.NONE; - @Rule - public TemporaryFolder sortFolder = new TemporaryFolder(); @Test public void noFileToMerge() throws Exception { - PipelinedMergeSortTask.Result result = runTest(); - Path resultFile = result.getFlatFileStoreFile().toPath(); + PipelinedMergeSortTask.Result result = runTest(algorithm); + Path resultFile = result.getFlatFileStoreFile(); assertEquals(0, Files.size(resultFile)); } @Test public void oneFileToMerge() throws Exception { - File singleFileToMerge = getTestFile("pipelined/merge-stage-1.json"); - PipelinedMergeSortTask.Result result = runTest(singleFileToMerge); - Path resultFile = result.getFlatFileStoreFile().toPath(); - assertEquals(Files.readAllLines(singleFileToMerge.toPath(), FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET)); + Path singleFileToMerge = getTestFile("pipelined/merge-stage-1.json"); + PipelinedMergeSortTask.Result result = runTest(algorithm, singleFileToMerge); + Path resultFile = result.getFlatFileStoreFile(); + assertEquals(Files.readAllLines(singleFileToMerge, FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET)); } @Test public void twoFilesToMerge() throws Exception { - File merge1 = getTestFile("pipelined/merge-stage-1.json"); - File merge2 = getTestFile("pipelined/merge-stage-2.json"); - File expected = getTestFile("pipelined/merge-expected.json"); - - PipelinedMergeSortTask.Result result = runTest(merge1, merge2); - Path resultFile = result.getFlatFileStoreFile().toPath(); - LOG.info("Result: {}\n{}", resultFile, Files.readString(resultFile, FLATFILESTORE_CHARSET)); - assertEquals(Files.readAllLines(expected.toPath(), FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET)); + Path merge1 = getTestFile("pipelined/merge-stage-1.json"); + Path merge2 = getTestFile("pipelined/merge-stage-2.json"); + Path expected = getTestFile("pipelined/merge-expected.json"); + + PipelinedMergeSortTask.Result result = runTest(algorithm, merge1, merge2); + Path resultFile = result.getFlatFileStoreFile(); + log.info("Result: {}\n{}", resultFile, Files.readString(resultFile, FLATFILESTORE_CHARSET)); + assertEquals(Files.readAllLines(expected, FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET)); } - private File getTestFile(String name) { + private Path getTestFile(String name) { URL url = classLoader.getResource(name); if (url == null) throw new IllegalArgumentException("Test file not found: " + name); - return new File(url.getPath()); + return Paths.get(url.getPath()); } - private PipelinedMergeSortTask.Result runTest(File... files) throws Exception { - File sortRoot = sortFolder.getRoot(); + private PipelinedMergeSortTask.Result runTest(Compression algorithm, Path... files) throws Exception { + Path sortRoot = sortFolder.getRoot().toPath(); // +1 for the Sentinel. - ArrayBlockingQueue<File> sortedFilesQueue = new ArrayBlockingQueue<>(files.length + 1); + ArrayBlockingQueue<Path> sortedFilesQueue = new ArrayBlockingQueue<>(files.length + 1); PipelinedMergeSortTask mergeSortTask = new PipelinedMergeSortTask(sortRoot, pathComparator, algorithm, sortedFilesQueue); // Enqueue all the files that are to be merged - for (File file : files) { + for (Path file : files) { // The intermediate files are deleted after being merged, so we should copy them to the temporary sort root folder - Path workDirCopy = Files.copy(file.toPath(), sortRoot.toPath().resolve(file.getName())); - sortedFilesQueue.put(workDirCopy.toFile()); + Path workDirCopy = Files.copy(file, sortRoot.resolve(file.getFileName())); + sortedFilesQueue.put(workDirCopy); } // Signal end of files to merge sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE); // Run the merge task PipelinedMergeSortTask.Result result = mergeSortTask.call(); - File[] filesInWorkDir = sortRoot.listFiles(); - if (filesInWorkDir == null) - throw new IllegalStateException("The sort work directory is not a directory: " + sortRoot); - assertEquals("The sort work directory should contain only the flat file store, the intermediate files should have been deleted after merged. Instead it contains: " + Arrays.toString(filesInWorkDir), - 1, filesInWorkDir.length); - assertTrue(result.getFlatFileStoreFile().exists()); + + try (Stream<Path> fileStream = Files.list(sortRoot)) { + List<String> filesInWorkDir = fileStream + .map(path -> path.getFileName().toString()) + .collect(Collectors.toList()); + assertEquals("The sort work directory should contain only the flat file store, the intermediate files should have been deleted after merged. Instead it contains: " + filesInWorkDir, + 1, filesInWorkDir.size()); + } + assertTrue(Files.exists(result.getFlatFileStoreFile())); return result; } @Test(expected = IllegalStateException.class) public void badInputFile() throws Exception { - File singleFileToMerge = createFileWithWrongFormat(); - runTest(singleFileToMerge); + Path singleFileToMerge = createFileWithWrongFormat(); + runTest(algorithm, singleFileToMerge); } - private File createFileWithWrongFormat() throws Exception { - File file = Files.createTempFile("merge-stage-input", ".json").toFile(); - try (BufferedWriter bw = Files.newBufferedWriter(file.toPath(), FLATFILESTORE_CHARSET)) { + private Path createFileWithWrongFormat() throws Exception { + Path file = Files.createTempFile(sortFolder.getRoot().toPath(), "merge-stage-input", ".json"); + try (BufferedWriter bw = Files.newBufferedWriter(file, FLATFILESTORE_CHARSET)) { bw.write("/a/b/c\n"); } - file.deleteOnExit(); return file; } + + @Test + public void manyFilesToMergeDidNotMerge() throws Exception { + int intermediateFilesCount = 256; + System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD, "20"); + System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE, "1000"); + System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB, "1"); + System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE, "1000"); + + // Generate FFS + List<String> ffs = generateFFS(LINES_IN_FFS); + // Shuffle entries to simulate retrieving from MongoDB by an arbitrary order + Collections.shuffle(ffs); + + // Generate the expected results by sorting using the node state entries comparator, + List<NodeStateHolder> nodesOrdered = sortAsNodeStateEntries(ffs); + // Convert back to a list of Strings + String[] expectedFFS = nodesOrdered.stream().map(f -> new String(f.getLine())).toArray(String[]::new); Review Comment: This uses the platforms default charset; should it use FLATFILESTORE_CHARSET, always UTF-8, or a configurable charset? ########## oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.commons.sort; + +import org.apache.jackrabbit.oak.commons.Compression; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.function.Function; + +/** + * Variation of ExternalSort that stores the lines read from intermediate files as byte arrays to avoid the conversion + * from byte[] to String and then back. + */ +public class ExternalSortByteArray { + public static <T> void mergeSortedFilesBinary(List<Path> files, BufferedOutputStream fbw, final Comparator<T> cmp, + boolean distinct, Compression algorithm, + Function<T, byte[]> typeToByteArray, Function<byte[], T> byteArrayToType) + throws IOException { + ArrayList<BinaryFileBuffer<T>> bfbs = new ArrayList<>(); + try { + for (Path f : files) { + InputStream in = algorithm.getInputStream(Files.newInputStream(f)); + bfbs.add(new BinaryFileBuffer<>(in, byteArrayToType)); + } + mergeBinary(fbw, cmp, distinct, bfbs, typeToByteArray); + } finally { + for (BinaryFileBuffer<T> buffer : bfbs) { + try { + buffer.close(); + } catch (Exception ignored) { + } + } + for (Path f : files) { + Files.deleteIfExists(f); + } + } + } + + private static <T> int mergeBinary(BufferedOutputStream fbw, final Comparator<T> cmp, boolean distinct, + List<BinaryFileBuffer<T>> buffers, Function<T, byte[]> typeToByteArray) + throws IOException { + PriorityQueue<BinaryFileBuffer<T>> pq = new PriorityQueue<>( + 11, + (i, j) -> cmp.compare(i.peek(), j.peek()) + ); + for (BinaryFileBuffer<T> bfb : buffers) { + if (!bfb.empty()) { + pq.add(bfb); + } + } + int rowcounter = 0; + T lastLine = null; + while (!pq.isEmpty()) { + BinaryFileBuffer<T> bfb = pq.poll(); + T r = bfb.pop(); + // Skip duplicate lines + if (!distinct || lastLine == null || cmp.compare(r, lastLine) != 0) { + fbw.write(typeToByteArray.apply(r)); + fbw.write('\n'); + lastLine = r; + } + ++rowcounter; + if (bfb.empty()) { + bfb.fbr.close(); + } else { + pq.add(bfb); // add it back + } + } + return rowcounter; + } + + /** + * WARNING: Uses '\n' as a line separator, it will not work with other line separators. + */ + private static class BinaryFileBuffer<T> { + private final static int BUFFER_SIZE = 64 * 1024; Review Comment: I think this should be configurable with a system property. The reason is, if there are many files (say 10'000), then having them open at the same time can result in out-of-memory. I wonder if 64 KB is the right size; my experience is that 8 KB doesn't slow down merging significantly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org