This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new d3a7549e81 NIFI-12231 FetchSmb supports Move and Delete Completion 
Strategies
d3a7549e81 is described below

commit d3a7549e8109774b06e79f13d570ef5da04c451f
Author: Peter Turcsanyi <turcsa...@apache.org>
AuthorDate: Tue Apr 9 18:09:13 2024 +0200

    NIFI-12231 FetchSmb supports Move and Delete Completion Strategies
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8617.
---
 .../processors/smb/util/CompletionStrategy.java    |  49 +++++
 .../apache/nifi/services/smb/SmbClientService.java |  10 +-
 .../org/apache/nifi/processors/smb/FetchSmb.java   | 152 ++++++++++-----
 .../org/apache/nifi/processors/smb/ListSmb.java    |   2 +-
 .../org/apache/nifi/processors/smb/FetchSmbIT.java | 212 +++++++++++++++++++--
 .../apache/nifi/processors/smb/ListSmbTest.java    |   4 +-
 .../nifi/processors/smb/SambaTestContainers.java   |  59 ++++--
 .../nifi/services/smb/SmbjClientService.java       | 113 +++++++----
 .../nifi/services/smb/SmbjClientServiceIT.java     |   2 +-
 .../nifi/services/smb/SmbjClientServiceTest.java   |   2 +-
 10 files changed, 482 insertions(+), 123 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
new file mode 100644
index 0000000000..cf3dd0fcbf
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum CompletionStrategy implements DescribedValue {
+
+    NONE("None", "Leaves the file as-is."),
+    MOVE("Move File", "Moves the file to the specified directory on the remote 
system. This option cannot be used when DFS is enabled on 'SMB Client Provider 
Service'."),
+    DELETE("Delete File", "Deletes the file from the remote system.");
+
+    private final String displayName;
+    private final String description;
+
+    CompletionStrategy(String displayName, String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
index 9eb0e41a67..c70dea133e 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
@@ -25,9 +25,13 @@ import java.util.stream.Stream;
  */
 public interface SmbClientService extends AutoCloseable {
 
-    Stream<SmbListableEntity> listRemoteFiles(String path);
+    Stream<SmbListableEntity> listFiles(String directoryPath);
 
-    void createDirectory(String path);
+    void ensureDirectory(String directoryPath);
 
-    void readFile(String fileName, OutputStream outputStream) throws 
IOException;
+    void readFile(String filePath, OutputStream outputStream) throws 
IOException;
+
+    void moveFile(String filePath, String directoryPath);
+
+    void deleteFile(String filePath);
 }
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
index c38216058d..b5f58b5890 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
@@ -16,16 +16,6 @@
  */
 package org.apache.nifi.processors.smb;
 
-import static java.util.Arrays.asList;
-import static java.util.Collections.unmodifiableSet;
-import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
-import static 
org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -33,17 +23,29 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.smb.util.CompletionStrategy;
 import org.apache.nifi.services.smb.SmbClientProviderService;
 import org.apache.nifi.services.smb.SmbClientService;
 import org.apache.nifi.services.smb.SmbException;
 
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static 
org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"samba", "smb", "cifs", "files", "fetch"})
 @CapabilityDescription("Fetches files from a SMB Share. Designed to be used in 
tandem with ListSmb.")
@@ -57,8 +59,8 @@ public class FetchSmb extends AbstractProcessor {
     public static final String ERROR_CODE_ATTRIBUTE = "error.code";
     public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
 
-    public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
-            .Builder().name("remote-file")
+    public static final PropertyDescriptor REMOTE_FILE = new 
PropertyDescriptor.Builder()
+            .name("remote-file")
             .displayName("Remote File")
             .description("The full path of the file to be retrieved from the 
remote server. Expression language is supported.")
             .required(true)
@@ -67,91 +69,139 @@ public class FetchSmb extends AbstractProcessor {
             .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new 
Builder()
+    public static final PropertyDescriptor COMPLETION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Completion Strategy")
+            .description("Specifies what to do with the original file on the 
server once it has been processed. If the Completion Strategy fails, a warning 
will be "
+                    + "logged but the data will still be transferred.")
+            .allowableValues(CompletionStrategy.class)
+            .defaultValue(CompletionStrategy.NONE)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DESTINATION_DIRECTORY = new 
PropertyDescriptor.Builder()
+            .name("Destination Directory")
+            .description("The directory on the remote server to move the 
original file to once it has been processed.")
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
+            .build();
+
+    public static final PropertyDescriptor CREATE_DESTINATION_DIRECTORY = new 
PropertyDescriptor.Builder()
+            .name("Create Destination Directory")
+            .description("Specifies whether or not the remote directory should 
be created if it does not exist.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
+            .build();
+
+    public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new 
PropertyDescriptor.Builder()
             .name("smb-client-provider-service")
             .displayName("SMB Client Provider Service")
             .description("Specifies the SMB client provider to use for 
creating SMB connections.")
             .required(true)
             .identifiesControllerService(SmbClientProviderService.class)
             .build();
+
     public static final Relationship REL_SUCCESS =
             new Relationship.Builder()
                     .name("success")
-                    .description("A flowfile will be routed here for each 
successfully fetched file.")
+                    .description("A FlowFile will be routed here for each 
successfully fetched file.")
                     .build();
+
     public static final Relationship REL_FAILURE =
-            new Relationship.Builder().name("failure")
-                    .description(
-                            "A flowfile will be routed here when failed to 
fetch its content.")
+            new Relationship.Builder()
+                    .name("failure")
+                    .description("A FlowFile will be routed here when failed 
to fetch its content.")
                     .build();
+
     public static final Set<Relationship> RELATIONSHIPS = unmodifiableSet(new 
HashSet<>(asList(
             REL_SUCCESS,
             REL_FAILURE
     )));
-    public static final String UNCATEGORIZED_ERROR = "-2";
+
     private static final List<PropertyDescriptor> PROPERTIES = asList(
             SMB_CLIENT_PROVIDER_SERVICE,
-            REMOTE_FILE
+            REMOTE_FILE,
+            COMPLETION_STRATEGY,
+            DESTINATION_DIRECTORY,
+            CREATE_DESTINATION_DIRECTORY
     );
 
+    public static final String UNCATEGORIZED_ERROR = "-2";
+
     @Override
     public Set<Relationship> getRelationships() {
         return RELATIONSHIPS;
     }
 
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
         }
 
-        final SmbClientProviderService clientProviderService =
-                
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
-
-        try (SmbClientService client = clientProviderService.getClient()) {
-            fetchAndTransfer(session, context, client, flowFile);
-        } catch (Exception e) {
-            getLogger().error("Couldn't connect to SMB.", e);
-            flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, 
getErrorCode(e));
-            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, 
e.getMessage());
-            session.transfer(flowFile, REL_FAILURE);
-        }
+        final Map<String, String> attributes = flowFile.getAttributes();
+        final String filePath = 
context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();
 
-    }
+        final SmbClientProviderService clientProviderService = 
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
 
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return PROPERTIES;
-    }
+        try (SmbClientService client = clientProviderService.getClient()) {
+            flowFile = session.write(flowFile, outputStream -> 
client.readFile(filePath, outputStream));
 
-    private void fetchAndTransfer(ProcessSession session, ProcessContext 
context, SmbClientService client,
-            FlowFile flowFile) {
-        final Map<String, String> attributes = flowFile.getAttributes();
-        final String filename = context.getProperty(REMOTE_FILE)
-                .evaluateAttributeExpressions(attributes).getValue();
-        try {
-            flowFile = session.write(flowFile, outputStream -> 
client.readFile(filename, outputStream));
             session.transfer(flowFile, REL_SUCCESS);
         } catch (Exception e) {
-            getLogger().error("Couldn't fetch file {}.", filename, e);
+            getLogger().error("Could not fetch file {}.", filePath, e);
             flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, 
getErrorCode(e));
-            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, 
getErrorMessage(e));
+            flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, 
e.getMessage());
             session.transfer(flowFile, REL_FAILURE);
+            return;
         }
+
+        session.commitAsync(() -> performCompletionStrategy(context, 
attributes));
     }
 
-    private String getErrorCode(Exception exception) {
+    private String getErrorCode(final Exception exception) {
         return Optional.ofNullable(exception instanceof SmbException ? 
(SmbException) exception : null)
                 .map(SmbException::getErrorCode)
                 .map(String::valueOf)
                 .orElse(UNCATEGORIZED_ERROR);
     }
 
-    private String getErrorMessage(Exception exception) {
-        return Optional.ofNullable(exception.getMessage())
-                .orElse(exception.getClass().getSimpleName());
+    private void performCompletionStrategy(final ProcessContext context, final 
Map<String, String> attributes) {
+        final CompletionStrategy completionStrategy = 
context.getProperty(COMPLETION_STRATEGY).asAllowableValue(CompletionStrategy.class);
+
+        if (completionStrategy == CompletionStrategy.NONE) {
+            return;
+        }
+
+        final String filePath = 
context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();
+
+        final SmbClientProviderService clientProviderService = 
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+        try (SmbClientService client = clientProviderService.getClient()) {
+            if (completionStrategy == CompletionStrategy.MOVE) {
+                final String destinationDirectory = 
context.getProperty(DESTINATION_DIRECTORY).evaluateAttributeExpressions(attributes).getValue();
+                final boolean createDestinationDirectory = 
context.getProperty(CREATE_DESTINATION_DIRECTORY).asBoolean();
+
+                if (createDestinationDirectory) {
+                    client.ensureDirectory(destinationDirectory);
+                }
+
+                client.moveFile(filePath, destinationDirectory);
+            } else if (completionStrategy == CompletionStrategy.DELETE) {
+                client.deleteFile(filePath);
+            }
+        } catch (Exception e) {
+            getLogger().warn("Could not perform completion strategy {} for 
file {}", completionStrategy, filePath, e);
+        }
     }
 
 }
-
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
index 886f1bee46..091a5b176e 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
@@ -345,7 +345,7 @@ public class ListSmb extends 
AbstractListProcessor<SmbListableEntity> {
                 
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
         final String directory = getDirectory(context);
         final SmbClientService clientService = 
clientProviderService.getClient();
-        return clientService.listRemoteFiles(directory).onClose(() -> {
+        return clientService.listFiles(directory).onClose(() -> {
             try {
                 clientService.close();
             } catch (Exception e) {
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
index fb057791a8..a1ad08db0b 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
@@ -16,52 +16,226 @@
  */
 package org.apache.nifi.processors.smb;
 
+import static org.apache.nifi.processors.smb.FetchSmb.COMPLETION_STRATEGY;
+import static 
org.apache.nifi.processors.smb.FetchSmb.CREATE_DESTINATION_DIRECTORY;
+import static org.apache.nifi.processors.smb.FetchSmb.DESTINATION_DIRECTORY;
 import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
 import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
 import static org.apache.nifi.processors.smb.FetchSmb.REMOTE_FILE;
 import static org.apache.nifi.util.TestRunners.newTestRunner;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+
+import org.apache.nifi.processors.smb.util.CompletionStrategy;
 import org.apache.nifi.services.smb.SmbjClientProviderService;
 import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-public class FetchSmbIT extends SambaTestContainers {
+class FetchSmbIT extends SambaTestContainers {
+
+    private static final String TEST_CONTENT = "test_content";
+
+    private TestRunner testRunner;
+
+    private SmbjClientProviderService smbjClientProviderService;
+
+    @BeforeEach
+    void setUpComponents() throws Exception {
+        testRunner = newTestRunner(FetchSmb.class);
+
+        smbjClientProviderService = configureSmbClient(testRunner, true);
+    }
+
+    @AfterEach
+    void tearDownComponents() {
+        testRunner.disableControllerService(smbjClientProviderService);
+    }
 
     @Test
-    public void fetchFilesUsingEL() throws Exception {
-        writeFile("/test_file", "test_content");
-        TestRunner testRunner = newTestRunner(FetchSmb.class);
+    void fetchFilesUsingEL() {
+        writeFile("test_file", TEST_CONTENT);
+
         testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
-        final SmbjClientProviderService smbjClientProviderService = 
configureSmbClient(testRunner, true);
 
-        Map<String, String> attributes = new HashMap<>();
+        final Map<String, String> attributes = new HashMap<>();
         attributes.put("attribute_to_find_using_EL", "test_file");
 
-        testRunner.enqueue("ignored", attributes);
-        testRunner.run();
-        testRunner.assertTransferCount(REL_SUCCESS, 1);
-        assertEquals("test_content", 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
-        testRunner.assertValid();
-        testRunner.disableControllerService(smbjClientProviderService);
+        runProcessor(attributes);
+
+        assertSuccessFlowFile();
     }
 
     @Test
-    public void tryToFetchNonExistingFileEmitsFailure() throws Exception {
-        TestRunner testRunner = newTestRunner(FetchSmb.class);
+    void tryToFetchNonExistingFileEmitsFailure() throws Exception {
         testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
-        final SmbjClientProviderService smbjClientProviderService = 
configureSmbClient(testRunner, true);
 
-        Map<String, String> attributes = new HashMap<>();
+        final Map<String, String> attributes = new HashMap<>();
         attributes.put("attribute_to_find_using_EL", "non_existing_file");
 
+        runProcessor(attributes);
+
+        testRunner.assertTransferCount(REL_FAILURE, 1);
+    }
+
+    @Test
+    void testCompletionStrategyNone() {
+        final String baseDir = "dir_none";
+        final String filename = "test_file";
+        final String filePath = baseDir + "/" + filename;
+
+        createDirectory(baseDir, AccessMode.READ_ONLY);
+        writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
+
+        testRunner.setProperty(REMOTE_FILE, filePath);
+        testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.NONE);
+
+        runProcessor();
+
+        assertSuccessFlowFile();
+        assertNoWarning();
+
+        assertTrue(fileExists(filePath));
+    }
+
+    @Test
+    void testCompletionStrategyDelete() {
+        final String baseDir = "dir_delete";
+        final String filename = "test_file";
+        final String filePath = baseDir + "/" + filename;
+
+        createDirectory(baseDir, AccessMode.READ_WRITE);
+        writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
+
+        testRunner.setProperty(REMOTE_FILE, filePath);
+        testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE);
+
+        runProcessor();
+
+        assertSuccessFlowFile();
+        assertNoWarning();
+
+        assertFalse(fileExists(filePath));
+    }
+
+    @Test
+    void testCompletionStrategyMoveWithExistingDirectory() {
+        final String baseDir = "dir_move_existing";
+        final String filename = "test_file";
+        final String filePath = baseDir + "/" + filename;
+        final String processedDir = "processed";
+
+        createDirectory(baseDir, AccessMode.READ_WRITE);
+        writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
+        createDirectory(processedDir, AccessMode.READ_WRITE);
+
+        testRunner.setProperty(REMOTE_FILE, filePath);
+        testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
+        testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
+
+        runProcessor();
+
+        assertSuccessFlowFile();
+        assertNoWarning();
+
+        assertFalse(fileExists(filePath));
+        assertTrue(fileExists(processedDir + "/" + filename));
+    }
+
+    @Test
+    void testCompletionStrategyMoveWithCreatingDirectory() {
+        final String baseDir = "dir_move_creating";
+        final String filename = "test_file";
+        final String filePath = baseDir + "/" + filename;
+        final String processedDir = "processed";
+
+        createDirectory(baseDir, AccessMode.READ_WRITE);
+        writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
+
+        testRunner.setProperty(REMOTE_FILE, filePath);
+        testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
+        testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
+        testRunner.setProperty(CREATE_DESTINATION_DIRECTORY, "true");
+
+        runProcessor();
+
+        assertSuccessFlowFile();
+        assertNoWarning();
+
+        assertFalse(fileExists(filePath));
+        assertTrue(fileExists(processedDir + "/" + filename));
+    }
+
+    @Test
+    void testCompletionStrategyDeleteFailsWhenNoPermission() {
+        final String baseDir = "dir_delete_noperm";
+        final String filename = "test_file";
+        final String filePath = baseDir + "/" + filename;
+
+        createDirectory(baseDir, AccessMode.READ_ONLY);
+        writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
+
+        testRunner.setProperty(REMOTE_FILE, filePath);
+        testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE);
+
+        runProcessor();
+
+        assertSuccessFlowFile();
+        assertWarning();
+
+        assertTrue(fileExists(filePath));
+    }
+
+    @Test
+    void testCompletionStrategyMoveFailsWhenNoPermission() {
+        final String baseDir = "dir_move_noperm";
+        final String filename = "test_file";
+        final String filePath = baseDir + "/" + filename;
+        final String processedDir = "processed";
+
+        createDirectory(baseDir, AccessMode.READ_ONLY);
+        writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
+        createDirectory(processedDir, AccessMode.READ_ONLY);
+
+        testRunner.setProperty(REMOTE_FILE, filePath);
+        testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
+        testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
+
+        runProcessor();
+
+        assertSuccessFlowFile();
+        assertWarning();
+
+        assertTrue(fileExists(filePath));
+        assertFalse(fileExists(processedDir + "/" + filename));
+    }
+
+    private void runProcessor() {
+        runProcessor(Collections.emptyMap());
+    }
+
+    private void runProcessor(final Map<String, String> attributes) {
         testRunner.enqueue("ignored", attributes);
         testRunner.run();
-        testRunner.assertTransferCount(REL_FAILURE, 1);
-        testRunner.assertValid();
-        testRunner.disableControllerService(smbjClientProviderService);
+    }
+
+    private void assertSuccessFlowFile() {
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertEquals(TEST_CONTENT, 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
+    }
+
+    private void assertWarning() {
+        assertFalse(testRunner.getLogger().getWarnMessages().isEmpty());
+    }
+
+    private void assertNoWarning() {
+        assertTrue(testRunner.getLogger().getWarnMessages().isEmpty());
     }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
index 0594ef3e2f..621ec74209 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
@@ -232,7 +232,7 @@ class ListSmbTest {
         testRunner.setProperty(LISTING_STRATEGY, "timestamps");
         testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
         final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSambaClient(testRunner);
-        
when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new 
RuntimeException("test exception"));
+        when(mockNifiSmbClientService.listFiles(anyString())).thenThrow(new 
RuntimeException("test exception"));
         testRunner.run();
         assertEquals(1, testRunner.getLogger().getErrorMessages().size());
         testRunner.assertValid();
@@ -282,7 +282,7 @@ class ListSmbTest {
     }
 
     private void mockSmbFolders(SmbClientService mockNifiSmbClientService, 
SmbListableEntity... entities) {
-        doAnswer(ignore -> 
stream(entities)).when(mockNifiSmbClientService).listRemoteFiles(anyString());
+        doAnswer(ignore -> 
stream(entities)).when(mockNifiSmbClientService).listFiles(anyString());
     }
 
     private SmbListableEntity listableEntity(String name, long timeStamp) {
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
index f38d36d171..51c283b85d 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
@@ -39,12 +39,19 @@ import org.testcontainers.utility.DockerImageName;
 
 public class SambaTestContainers {
 
+    protected final static Logger LOGGER = 
LoggerFactory.getLogger(SambaTestContainers.class);
+
     protected final static Integer DEFAULT_SAMBA_PORT = 445;
-    protected final static Logger logger = 
LoggerFactory.getLogger(SambaTestContainers.class);
+
+    protected enum AccessMode {
+        READ_ONLY,READ_WRITE;
+    }
+
     protected final GenericContainer<?> sambaContainer = new 
GenericContainer<>(DockerImageName.parse("dperson/samba"))
+            .withCreateContainerCmdModifier(cmd -> cmd.withName("samba-test"))
             .withExposedPorts(DEFAULT_SAMBA_PORT, 139)
             .waitingFor(Wait.forListeningPort())
-            .withLogConsumer(new Slf4jLogConsumer(logger))
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER))
             .withCommand("-w domain -u username;password -s 
share;/folder;;no;no;username;;; -p");
 
     @BeforeEach
@@ -57,33 +64,63 @@ public class SambaTestContainers {
         sambaContainer.stop();
     }
 
-    protected SmbjClientProviderService configureSmbClient(TestRunner 
testRunner, boolean shouldEnableSmbClient)
-            throws Exception {
+    protected SmbjClientProviderService configureSmbClient(final TestRunner 
testRunner, final boolean shouldEnableSmbClient) throws Exception {
         final SmbjClientProviderService smbjClientProviderService = new 
SmbjClientProviderService();
         testRunner.addControllerService("client-provider", 
smbjClientProviderService);
+
         testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
         testRunner.setProperty(smbjClientProviderService, HOSTNAME, 
sambaContainer.getHost());
-        testRunner.setProperty(smbjClientProviderService, PORT,
-                
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
+        testRunner.setProperty(smbjClientProviderService, PORT, 
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
         testRunner.setProperty(smbjClientProviderService, USERNAME, 
"username");
         testRunner.setProperty(smbjClientProviderService, PASSWORD, 
"password");
         testRunner.setProperty(smbjClientProviderService, SHARE, "share");
         testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
+
         if (shouldEnableSmbClient) {
             testRunner.enableControllerService(smbjClientProviderService);
         }
+
         return smbjClientProviderService;
     }
 
-    protected String generateContentWithSize(int sizeInBytes) {
-        byte[] bytes = new byte[sizeInBytes];
+    protected String generateContentWithSize(final int sizeInBytes) {
+        final byte[] bytes = new byte[sizeInBytes];
         fill(bytes, (byte) 1);
         return new String(bytes);
     }
 
-    protected void writeFile(String path, String content) {
-        String containerPath = "/folder/" + path;
-        sambaContainer.copyFileToContainer(Transferable.of(content), 
containerPath);
+    protected void createDirectory(final String path) {
+        createDirectory(path, AccessMode.READ_ONLY);
+    }
+
+    protected void createDirectory(final String path, final AccessMode 
accessMode) {
+        final String dirMode = accessMode == AccessMode.READ_ONLY ? "755" : 
"777";
+        try {
+            sambaContainer.execInContainer("bash", "-c", "mkdir -m " + dirMode 
+ " -p " + getContainerPath(path));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create directory", e);
+        }
+    }
+
+    protected void writeFile(final String path, final String content) {
+        writeFile(path, content, AccessMode.READ_ONLY);
+    }
+
+    protected void writeFile(final String path, final String content, final 
AccessMode accessMode) {
+        final int fileMode = accessMode == AccessMode.READ_ONLY ? 0100644: 
0100666;
+        sambaContainer.copyFileToContainer(Transferable.of(content, fileMode), 
getContainerPath(path));
+    }
+
+    protected boolean fileExists(final String path) {
+        try {
+            return sambaContainer.execInContainer("bash", "-c", "cat " + 
getContainerPath(path) + " > /dev/null").getExitCode() == 0;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check file", e);
+        }
+    }
+
+    private String getContainerPath(final String path) {
+        return "/folder/" + path;
     }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
index ae9307ea64..4e04bf85d1 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
@@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
 import static java.util.stream.StreamSupport.stream;
 
 import com.hierynomus.msdtyp.AccessMask;
+import com.hierynomus.mserref.NtStatus;
 import com.hierynomus.msfscc.FileAttributes;
 import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation;
 import com.hierynomus.mssmb2.SMB2CreateDisposition;
@@ -45,13 +46,13 @@ class SmbjClientService implements SmbClientService {
     private final static Logger LOGGER = 
LoggerFactory.getLogger(SmbjClientService.class);
 
     private static final List<String> SPECIAL_DIRECTORIES = asList(".", "..");
-    private static final long UNCATEGORISED_ERROR = -1L;
+    private static final long UNCATEGORIZED_ERROR = -1L;
 
     private final Session session;
     private final DiskShare share;
     private final URI serviceLocation;
 
-    SmbjClientService(Session session, DiskShare share, URI serviceLocation) {
+    SmbjClientService(final Session session, final DiskShare share, final URI 
serviceLocation) {
         this.session = session;
         this.share = share;
         this.serviceLocation = serviceLocation;
@@ -69,50 +70,89 @@ class SmbjClientService implements SmbClientService {
     }
 
     @Override
-    public Stream<SmbListableEntity> listRemoteFiles(String filePath) {
-        return Stream.of(filePath).flatMap(path -> {
+    public Stream<SmbListableEntity> listFiles(final String directoryPath) {
+        return Stream.of(directoryPath).flatMap(path -> {
             final Directory directory = openDirectory(path);
             return stream(directory::spliterator, 0, false)
                     .map(entity -> buildSmbListableEntity(entity, path, 
serviceLocation))
                     .filter(entity -> !specialDirectory(entity))
-                    .flatMap(listable -> listable.isDirectory() ? 
listRemoteFiles(listable.getPathWithName())
+                    .flatMap(listable -> listable.isDirectory() ? 
listFiles(listable.getPathWithName())
                             : Stream.of(listable))
                     .onClose(directory::close);
         });
     }
 
     @Override
-    public void createDirectory(String path) {
-        final int lastDirectorySeparatorPosition = path.lastIndexOf("/");
-        if (lastDirectorySeparatorPosition > 0) {
-            createDirectory(path.substring(0, lastDirectorySeparatorPosition));
-        }
-        if (!share.folderExists(path)) {
-            share.mkdir(path);
+    public void ensureDirectory(final String directoryPath) {
+        try {
+            final int lastDirectorySeparatorPosition = 
directoryPath.lastIndexOf("/");
+            if (lastDirectorySeparatorPosition > 0) {
+                ensureDirectory(directoryPath.substring(0, 
lastDirectorySeparatorPosition));
+            }
+
+            if (!share.folderExists(directoryPath)) {
+                try {
+                    share.mkdir(directoryPath);
+                } catch (SMBApiException e) {
+                    if (e.getStatus() == 
NtStatus.STATUS_OBJECT_NAME_COLLISION) {
+                        if (!share.folderExists(directoryPath)) {
+                            throw e;
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw wrapException(e);
         }
     }
 
     @Override
-    public void readFile(String fileName, OutputStream outputStream) throws 
IOException {
-        try (File f = share.openFile(
-                fileName,
+    public void readFile(final String filePath, final OutputStream 
outputStream) throws IOException {
+        try (File file = share.openFile(
+                filePath,
                 EnumSet.of(AccessMask.GENERIC_READ),
                 EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
                 EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
                 SMB2CreateDisposition.FILE_OPEN,
                 EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
         ) {
-            f.read(outputStream);
-        } catch (SMBApiException a) {
-            throw new SmbException(a.getMessage(), a.getStatusCode(), a);
+            file.read(outputStream);
         } catch (Exception e) {
-            throw new SmbException(e.getMessage(), UNCATEGORISED_ERROR, e);
+            throw wrapException(e);
         } finally {
             outputStream.close();
         }
     }
 
-    private SmbListableEntity 
buildSmbListableEntity(FileIdBothDirectoryInformation info, String path, URI 
serviceLocation) {
+    @Override
+    public void moveFile(final String filePath, final String directoryPath) {
+        try (File file = share.openFile(
+                filePath,
+                EnumSet.of(AccessMask.GENERIC_WRITE, AccessMask.DELETE),
+                EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+                EnumSet.noneOf(SMB2ShareAccess.class),
+                SMB2CreateDisposition.FILE_OPEN,
+                EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
+        ) {
+            final String[] parts = filePath.split("/");
+            // rename operation on Windows requires \ (backslash) path 
separator
+            final String newFilePath = directoryPath.replace('/', '\\') + "\\" 
+ parts[parts.length - 1];
+            file.rename(newFilePath);
+        } catch (Exception e) {
+            throw wrapException(e);
+        }
+    }
+
+    @Override
+    public void deleteFile(final String filePath) {
+        try {
+            share.rm(filePath);
+        } catch (Exception e) {
+            throw wrapException(e);
+        }
+    }
+
+    private SmbListableEntity buildSmbListableEntity(final 
FileIdBothDirectoryInformation info, final String path, final URI 
serviceLocation) {
         return SmbListableEntity.builder()
                 .setName(info.getFileName())
                 .setShortName(info.getShortName())
@@ -128,25 +168,30 @@ class SmbjClientService implements SmbClientService {
                 .build();
     }
 
-    private Directory openDirectory(String path) {
-        try {
-            return share.openDirectory(
-                    path,
-                    EnumSet.of(AccessMask.GENERIC_READ),
-                    EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
-                    EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
-                    SMB2CreateDisposition.FILE_OPEN,
-                    EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
-            );
-        } catch (SMBApiException s) {
-            throw new RuntimeException("Could not open directory " + path + " 
due to " + s.getMessage(), s);
-        }
+    private Directory openDirectory(final String path) {
+        return share.openDirectory(
+                path,
+                EnumSet.of(AccessMask.GENERIC_READ),
+                EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
+                EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
+                SMB2CreateDisposition.FILE_OPEN,
+                EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
+        );
     }
 
-    private boolean specialDirectory(SmbListableEntity entity) {
+    private boolean specialDirectory(final SmbListableEntity entity) {
         return SPECIAL_DIRECTORIES.contains(entity.getName());
     }
 
+    private SmbException wrapException(final Exception e) {
+        if (e instanceof SmbException) {
+            return (SmbException) e;
+        } else {
+            final long errorCode = e instanceof SMBApiException ? 
((SMBApiException) e).getStatusCode() : UNCATEGORIZED_ERROR;
+            return new SmbException(e.getMessage(), errorCode, e);
+        }
+    }
+
 }
 
 
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
index 59278e54c5..e671510a2e 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
@@ -132,7 +132,7 @@ public class SmbjClientServiceIT {
                         sambaProxy.setConnectionCut(true);
                     }
 
-                    final Set<String> actual = 
s.listRemoteFiles("testDirectory")
+                    final Set<String> actual = s.listFiles("testDirectory")
                             .map(SmbListableEntity::getIdentifier)
                             .collect(toSet());
 
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
index 7193aad091..e30038684a 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
@@ -52,7 +52,7 @@ class SmbjClientServiceTest {
         when(share.fileExists("to")).thenReturn(false);
         when(share.fileExists("create")).thenReturn(false);
 
-        underTest.createDirectory("directory/path/to/create");
+        underTest.ensureDirectory("directory/path/to/create");
 
         verify(share).mkdir("directory/path");
         verify(share).mkdir("directory/path/to");


Reply via email to