nfsantos commented on code in PR #1156:
URL: https://github.com/apache/jackrabbit-oak/pull/1156#discussion_r1369897533
##########
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java:
##########
@@ -19,103 +19,134 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.jackrabbit.oak.commons.Compression;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
-import java.io.File;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Set;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class PipelinedMergeSortTaskTest {
- private static final Logger LOG =
LoggerFactory.getLogger(PipelinedMergeSortTaskTest.class);
+public class PipelinedMergeSortTaskTest extends PipelinedMergeSortTaskTestBase
{
private final ClassLoader classLoader = getClass().getClassLoader();
- private final PathElementComparator pathComparator = new
PathElementComparator(Set.of());
private final Compression algorithm = Compression.NONE;
- @Rule
- public TemporaryFolder sortFolder = new TemporaryFolder();
@Test
public void noFileToMerge() throws Exception {
- PipelinedMergeSortTask.Result result = runTest();
- Path resultFile = result.getFlatFileStoreFile().toPath();
+ PipelinedMergeSortTask.Result result = runTest(algorithm);
+ Path resultFile = result.getFlatFileStoreFile();
assertEquals(0, Files.size(resultFile));
}
@Test
public void oneFileToMerge() throws Exception {
- File singleFileToMerge = getTestFile("pipelined/merge-stage-1.json");
- PipelinedMergeSortTask.Result result = runTest(singleFileToMerge);
- Path resultFile = result.getFlatFileStoreFile().toPath();
- assertEquals(Files.readAllLines(singleFileToMerge.toPath(),
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
+ Path singleFileToMerge = getTestFile("pipelined/merge-stage-1.json");
+ PipelinedMergeSortTask.Result result = runTest(algorithm,
singleFileToMerge);
+ Path resultFile = result.getFlatFileStoreFile();
+ assertEquals(Files.readAllLines(singleFileToMerge,
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
}
@Test
public void twoFilesToMerge() throws Exception {
- File merge1 = getTestFile("pipelined/merge-stage-1.json");
- File merge2 = getTestFile("pipelined/merge-stage-2.json");
- File expected = getTestFile("pipelined/merge-expected.json");
-
- PipelinedMergeSortTask.Result result = runTest(merge1, merge2);
- Path resultFile = result.getFlatFileStoreFile().toPath();
- LOG.info("Result: {}\n{}", resultFile, Files.readString(resultFile,
FLATFILESTORE_CHARSET));
- assertEquals(Files.readAllLines(expected.toPath(),
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
+ Path merge1 = getTestFile("pipelined/merge-stage-1.json");
+ Path merge2 = getTestFile("pipelined/merge-stage-2.json");
+ Path expected = getTestFile("pipelined/merge-expected.json");
+
+ PipelinedMergeSortTask.Result result = runTest(algorithm, merge1,
merge2);
+ Path resultFile = result.getFlatFileStoreFile();
+ log.info("Result: {}\n{}", resultFile, Files.readString(resultFile,
FLATFILESTORE_CHARSET));
+ assertEquals(Files.readAllLines(expected, FLATFILESTORE_CHARSET),
Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
}
- private File getTestFile(String name) {
+ private Path getTestFile(String name) {
URL url = classLoader.getResource(name);
if (url == null) throw new IllegalArgumentException("Test file not
found: " + name);
- return new File(url.getPath());
+ return Paths.get(url.getPath());
}
- private PipelinedMergeSortTask.Result runTest(File... files) throws
Exception {
- File sortRoot = sortFolder.getRoot();
+ private PipelinedMergeSortTask.Result runTest(Compression algorithm,
Path... files) throws Exception {
+ Path sortRoot = sortFolder.getRoot().toPath();
// +1 for the Sentinel.
- ArrayBlockingQueue<File> sortedFilesQueue = new
ArrayBlockingQueue<>(files.length + 1);
+ ArrayBlockingQueue<Path> sortedFilesQueue = new
ArrayBlockingQueue<>(files.length + 1);
PipelinedMergeSortTask mergeSortTask = new
PipelinedMergeSortTask(sortRoot, pathComparator, algorithm, sortedFilesQueue);
// Enqueue all the files that are to be merged
- for (File file : files) {
+ for (Path file : files) {
// The intermediate files are deleted after being merged, so we
should copy them to the temporary sort root folder
- Path workDirCopy = Files.copy(file.toPath(),
sortRoot.toPath().resolve(file.getName()));
- sortedFilesQueue.put(workDirCopy.toFile());
+ Path workDirCopy = Files.copy(file,
sortRoot.resolve(file.getFileName()));
+ sortedFilesQueue.put(workDirCopy);
}
// Signal end of files to merge
sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
// Run the merge task
PipelinedMergeSortTask.Result result = mergeSortTask.call();
- File[] filesInWorkDir = sortRoot.listFiles();
- if (filesInWorkDir == null)
- throw new IllegalStateException("The sort work directory is not a
directory: " + sortRoot);
- assertEquals("The sort work directory should contain only the flat
file store, the intermediate files should have been deleted after merged.
Instead it contains: " + Arrays.toString(filesInWorkDir),
- 1, filesInWorkDir.length);
- assertTrue(result.getFlatFileStoreFile().exists());
+
+ try (Stream<Path> fileStream = Files.list(sortRoot)) {
+ List<String> filesInWorkDir = fileStream
+ .map(path -> path.getFileName().toString())
+ .collect(Collectors.toList());
+ assertEquals("The sort work directory should contain only the flat
file store, the intermediate files should have been deleted after merged.
Instead it contains: " + filesInWorkDir,
+ 1, filesInWorkDir.size());
+ }
+ assertTrue(Files.exists(result.getFlatFileStoreFile()));
return result;
}
@Test(expected = IllegalStateException.class)
public void badInputFile() throws Exception {
- File singleFileToMerge = createFileWithWrongFormat();
- runTest(singleFileToMerge);
+ Path singleFileToMerge = createFileWithWrongFormat();
+ runTest(algorithm, singleFileToMerge);
}
- private File createFileWithWrongFormat() throws Exception {
- File file = Files.createTempFile("merge-stage-input",
".json").toFile();
- try (BufferedWriter bw = Files.newBufferedWriter(file.toPath(),
FLATFILESTORE_CHARSET)) {
+ private Path createFileWithWrongFormat() throws Exception {
+ Path file = Files.createTempFile(sortFolder.getRoot().toPath(),
"merge-stage-input", ".json");
+ try (BufferedWriter bw = Files.newBufferedWriter(file,
FLATFILESTORE_CHARSET)) {
bw.write("/a/b/c\n");
}
- file.deleteOnExit();
return file;
}
+
+ @Test
+ public void manyFilesToMergeDidNotMerge() throws Exception {
+ int intermediateFilesCount = 256;
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
"20");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
"1000");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
"1");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
"1000");
+
+ // Generate FFS
+ List<String> ffs = generateFFS(LINES_IN_FFS);
+ // Shuffle entries to simulate retrieving from MongoDB by an arbitrary
order
+ Collections.shuffle(ffs);
+
+ // Generate the expected results by sorting using the node state
entries comparator,
+ List<NodeStateHolder> nodesOrdered = sortAsNodeStateEntries(ffs);
+ // Convert back to a list of Strings
+ String[] expectedFFS = nodesOrdered.stream().map(f -> new
String(f.getLine())).toArray(String[]::new);
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]