keith-turner commented on code in PR #5930:
URL: https://github.com/apache/accumulo/pull/5930#discussion_r2388480653


##########
test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java:
##########
@@ -206,6 +217,108 @@ public void testSingleTabletSingleFile() throws Exception 
{
     }
   }
 
+  @Test
+  public void testConcurrentImportSameDirectory() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      final int numTasks = 16;
+      final int iterations = 3;
+      final int startRow = 0;
+      final int endRow = 199;
+
+      ExecutorService pool = Executors.newFixedThreadPool(numTasks);
+
+      try {
+        for (int i = 0; i < iterations; i++) {
+          LOG.debug("Running concurrent import iteration {}/{}", i + 1, 
iterations);
+          final String table = getUniqueNames(1)[0] + i;
+          client.tableOperations().create(table);
+
+          Path sourceDir = new Path(rootPath + "/concurrent/" + table + 
"_sourceDir");
+          assertTrue(fs.mkdirs(sourceDir), "Failed to create " + sourceDir);
+
+          writeData(fs, sourceDir + "/f.", aconf, startRow, endRow);
+
+          CountDownLatch startSignal = new CountDownLatch(numTasks);
+          List<Future<Boolean>> futures = new ArrayList<>(numTasks);
+          Predicate<Throwable> expectedConcurrentFailure = throwable -> {
+            final String threadName = Thread.currentThread().getName();
+            for (Throwable current = throwable; current != null; current = 
current.getCause()) {
+              // File system level concurrency failures during bulk import 
validation
+              if (current instanceof IllegalArgumentException
+                  || current instanceof FileAlreadyExistsException) {
+                LOG.debug("Concurrent import attempt ({}) failed as expected 
with: {}", threadName,
+                    current.getMessage());
+                return true;
+              }
+
+            }
+
+            String message = throwable == null ? null : throwable.getMessage();
+            if (message == null) {
+              return false;
+            }
+            // Directory processed by another thread
+            if (message.contains("Attempted to import zero files")
+                // Concurrent .isWritable file creation
+                || message.contains("File exists")
+            // Directory permissions changed by winner
+                || message.contains("not writable")
+            // Permission test file conflicts
+                || message.contains(".isWritable")

Review Comment:
   ```suggestion
                   || message.contains("isWritable")
   ```
   
   Seeing the following exception that fails the test when running this, maybe 
this check should be relaxed.  Seems like it is not matching the message 
because of `.`
   
   ```
   java.util.concurrent.ExecutionException: ExitCodeException exitCode=1: 
chmod: cannot access 
'/accumulo/test/target/mini-tests/org.apache.accumulo.test.functional.BulkNewIT_SharedMiniClusterBase/tmp/concurrent/BulkNewIT_testConcurrentImportSameDirectory01_sourceDir/isWritable':
 No such file or directory
   
   
        at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
        at 
org.apache.accumulo.test.functional.BulkNewIT.testConcurrentImportSameDirectory(BulkNewIT.java:299)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: ExitCodeException exitCode=1: chmod: cannot access 
'/accumulo/test/target/mini-tests/org.apache.accumulo.test.functional.BulkNewIT_SharedMiniClusterBase/tmp/concurrent/BulkNewIT_testConcurrentImportSameDirectory01_sourceDir/isWritable':
 No such file or directory
   
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:1068)
        at org.apache.hadoop.util.Shell.run(Shell.java:957)
        at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1282)
        at org.apache.hadoop.util.Shell.execCommand(Shell.java:1377)
        at org.apache.hadoop.util.Shell.execCommand(Shell.java:1359)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1114)
        at 
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:452)
        at 
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:412)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:575)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:564)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:595)
        at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:642)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:730)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:709)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1233)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1210)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1165)
        at org.apache.hadoop.fs.FileSystem.createNewFile(FileSystem.java:1499)
        at 
org.apache.accumulo.core.clientImpl.bulk.BulkImport.checkPath(BulkImport.java:216)
        at 
org.apache.accumulo.core.clientImpl.bulk.BulkImport.load(BulkImport.java:136)
        at 
org.apache.accumulo.test.functional.BulkNewIT.lambda$testConcurrentImportSameDirectory$1(BulkNewIT.java:284)
        ... 4 more
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to