This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 10ce00862d NIFI-12837 Added DFS support in SMB processors
10ce00862d is described below

commit 10ce00862d1479dee1444d0da9cc48bbbd83729f
Author: Peter Turcsanyi <turcsa...@apache.org>
AuthorDate: Mon Mar 18 11:04:32 2024 +0100

    NIFI-12837 Added DFS support in SMB processors
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8527.
---
 .../org/apache/nifi/processors/smb/GetSmbFile.java |   2 +
 .../org/apache/nifi/processors/smb/PutSmbFile.java |   2 +
 .../org/apache/nifi/processors/smb/SmbDfsIT.java   | 229 +++++++++++++++++++++
 .../services/smb/SmbjClientProviderService.java    |  83 +++-----
 ...iSmbjClientIT.java => SmbjClientServiceIT.java} |   4 +-
 ...jClientTest.java => SmbjClientServiceTest.java} |   8 +-
 .../java/org/apache/nifi/smb/common/SmbClient.java |  92 +++++++++
 .../org/apache/nifi/smb/common/SmbProperties.java  |   9 +
 .../java/org/apache/nifi/smb/common/SmbUtils.java  |   7 +-
 9 files changed, 376 insertions(+), 60 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
index ed843f9bff..f1649a4faa 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
@@ -80,6 +80,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
 import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
 import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
 import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@@ -257,6 +258,7 @@ public class GetSmbFile extends AbstractProcessor {
         descriptors.add(IGNORE_HIDDEN_FILES);
         descriptors.add(SMB_DIALECT);
         descriptors.add(USE_ENCRYPTION);
+        descriptors.add(ENABLE_DFS);
         descriptors.add(TIMEOUT);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 468828f819..af2eab2ff1 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -64,6 +64,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
 import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
 import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
 import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@@ -194,6 +195,7 @@ public class PutSmbFile extends AbstractProcessor {
         descriptors.add(RENAME_SUFFIX);
         descriptors.add(SMB_DIALECT);
         descriptors.add(USE_ENCRYPTION);
+        descriptors.add(ENABLE_DFS);
         descriptors.add(TIMEOUT);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java
new file mode 100644
index 0000000000..6fe51b3eb6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.smb.common.SmbProperties;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.NO_TRACKING;
+import static 
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
+import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class SmbDfsIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(SmbDfsIT.class);
+
+    private static final int DEFAULT_SMB_PORT = 445;
+
+    // DFS works only on the default SMB port (445). Not sure if it is a 
generic DFS vs Samba DFS constraint, or an issue in the smbj client library.
+    private final GenericContainer<?> sambaContainer = new 
FixedHostPortGenericContainer<>("dperson/samba")
+            .withFixedExposedPort(DEFAULT_SMB_PORT, DEFAULT_SMB_PORT)
+            .waitingFor(Wait.forListeningPort())
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER))
+            .withCommand("-u", "myuser;mypass",
+                    "-s", "share;/share-dir;;no;no;myuser;;;",
+                    "-s", "dfs-share;/dfs-share-dir;;no;no;myuser;;;",
+                    "-p",
+                    "-g", "host msdfs = yes",
+                    "-G", "dfs-share;msdfs root = yes");
+
+    @BeforeEach
+    void beforeEach() throws Exception {
+        sambaContainer.start();
+
+        sambaContainer.execInContainer("ln", "-s", "msdfs:" + 
sambaContainer.getHost() + "\\share", "/dfs-share-dir/dfs-link");
+        Thread.sleep(100);
+    }
+
+    @AfterEach
+    void afterEach() {
+        sambaContainer.stop();
+    }
+
+    @Test
+    void testFetchSmb() throws Exception {
+        writeFile("fetch_file", "fetch_content");
+
+        TestRunner testRunner = newTestRunner(FetchSmb.class);
+        testRunner.setProperty(FetchSmb.REMOTE_FILE, "dfs-link/fetch_file");
+        SmbjClientProviderService smbjClientProviderService = 
configureSmbClient(testRunner);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertTransferCount(FetchSmb.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchSmb.REL_SUCCESS).get(0);
+        assertEquals("fetch_content", flowFile.getContent());
+
+        testRunner.disableControllerService(smbjClientProviderService);
+    }
+
+    @Test
+    void testFetchFileFailsWhenDfsIsDisabled() throws Exception {
+        writeFile("fetch_file", "fetch_content");
+
+        TestRunner testRunner = newTestRunner(FetchSmb.class);
+        testRunner.setProperty(FetchSmb.REMOTE_FILE, "dfs-link/fetch_file");
+        SmbjClientProviderService smbjClientProviderService = 
configureSmbClient(testRunner, false);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertTransferCount(FetchSmb.REL_FAILURE, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchSmb.REL_FAILURE).get(0);
+        assertEquals(0, flowFile.getSize());
+
+        testRunner.disableControllerService(smbjClientProviderService);
+    }
+
+    @Test
+    void testListSmbWithDfsLink() throws Exception {
+        testListSmb("dfs-link");
+    }
+
+    @Test
+    @Disabled("Listing folders recursively from the DFS root or a parent 
directory of the DFS link does not work on Samba due to 
https://github.com/hierynomus/smbj/issues/717#";)
+    void testListSmbWithDfsRoot() throws Exception {
+        testListSmb(null);
+    }
+
+    private void testListSmb(String directory) throws Exception {
+        writeFile("list_file", "list_content");
+
+        TestRunner testRunner = newTestRunner(ListSmb.class);
+        if (directory != null) {
+            testRunner.setProperty(ListSmb.DIRECTORY, directory);
+        }
+        testRunner.setProperty(ListSmb.LISTING_STRATEGY, NO_TRACKING);
+        testRunner.setProperty(ListSmb.MINIMUM_AGE, "0 ms");
+        SmbjClientProviderService smbjClientProviderService = 
configureSmbClient(testRunner);
+
+        testRunner.run();
+
+        testRunner.assertTransferCount(ListSmb.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(ListSmb.REL_SUCCESS).get(0);
+        assertEquals(0, flowFile.getSize());
+        assertEquals("dfs-link", 
flowFile.getAttribute(CoreAttributes.PATH.key()));
+        assertEquals("list_file", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+
+        testRunner.disableControllerService(smbjClientProviderService);
+    }
+
+    @Test
+    void testPutSmbFile() {
+        TestRunner testRunner = newTestRunner(PutSmbFile.class);
+        testRunner.setProperty(PutSmbFile.HOSTNAME, sambaContainer.getHost());
+        testRunner.setProperty(PutSmbFile.SHARE, "dfs-share");
+        testRunner.setProperty(PutSmbFile.DIRECTORY, "dfs-link");
+        testRunner.setProperty(PutSmbFile.USERNAME, "myuser");
+        testRunner.setProperty(PutSmbFile.PASSWORD, "mypass");
+        testRunner.setProperty(SmbProperties.ENABLE_DFS, "true");
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "put_file");
+
+        testRunner.enqueue("put_content", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutSmbFile.REL_SUCCESS, 1);
+
+        String fileContent = readFile("put_file");
+        assertEquals("put_content", fileContent);
+    }
+
+    @Test
+    void testGetSmbFile() {
+        writeFile("get_file", "get_content");
+
+        TestRunner testRunner = newTestRunner(GetSmbFile.class);
+        testRunner.setProperty(GetSmbFile.HOSTNAME, sambaContainer.getHost());
+        testRunner.setProperty(GetSmbFile.SHARE, "dfs-share");
+        testRunner.setProperty(GetSmbFile.DIRECTORY, "dfs-link");
+        testRunner.setProperty(GetSmbFile.USERNAME, "myuser");
+        testRunner.setProperty(GetSmbFile.PASSWORD, "mypass");
+        testRunner.setProperty(SmbProperties.ENABLE_DFS, "true");
+
+        testRunner.run();
+
+        testRunner.assertTransferCount(GetSmbFile.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(GetSmbFile.REL_SUCCESS).get(0);
+        assertEquals("get_content", flowFile.getContent());
+        assertEquals("dfs-link", 
flowFile.getAttribute(CoreAttributes.PATH.key()));
+        assertEquals("get_file", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+    }
+
+    private SmbjClientProviderService configureSmbClient(TestRunner 
testRunner) throws InitializationException {
+        return configureSmbClient(testRunner, true);
+    }
+
+    private SmbjClientProviderService configureSmbClient(TestRunner 
testRunner, boolean enableDfs) throws InitializationException {
+        SmbjClientProviderService smbjClientProviderService = new 
SmbjClientProviderService();
+
+        testRunner.addControllerService("client-provider", 
smbjClientProviderService);
+
+        testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
+
+        testRunner.setProperty(smbjClientProviderService, HOSTNAME, 
sambaContainer.getHost());
+        testRunner.setProperty(smbjClientProviderService, PORT, 
Integer.toString(DEFAULT_SMB_PORT));
+        testRunner.setProperty(smbjClientProviderService, USERNAME, "myuser");
+        testRunner.setProperty(smbjClientProviderService, PASSWORD, "mypass");
+        testRunner.setProperty(smbjClientProviderService, SHARE, "dfs-share");
+        testRunner.setProperty(smbjClientProviderService, ENABLE_DFS, 
Boolean.toString(enableDfs));
+
+        testRunner.enableControllerService(smbjClientProviderService);
+
+        return smbjClientProviderService;
+    }
+
+    private void writeFile(String filename, String content) {
+        String containerPath = "/share-dir/" + filename;
+        sambaContainer.copyFileToContainer(Transferable.of(content), 
containerPath);
+    }
+
+    private String readFile(String filename) {
+        String containerPath = "/share-dir/" + filename;
+        return sambaContainer.copyFileFromContainer(containerPath, is -> 
IOUtils.toString(is, StandardCharsets.UTF_8));
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
index 080be06199..ebffb816ec 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
@@ -39,6 +39,7 @@ import static java.util.Arrays.asList;
 import static 
org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
 import static 
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
 import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR;
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
 import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
 import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
 import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@@ -112,6 +113,7 @@ public class SmbjClientProviderService extends 
AbstractControllerService impleme
                     DOMAIN,
                     SMB_DIALECT,
                     USE_ENCRYPTION,
+                    ENABLE_DFS,
                     TIMEOUT
             ));
 
@@ -122,24 +124,37 @@ public class SmbjClientProviderService extends 
AbstractControllerService impleme
     private String shareName;
 
     @Override
-    public SmbClientService getClient() throws IOException {
-        Connection connection = null;
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
 
-        try {
-            connection = smbClient.connect(hostname, port);
-            return connectToShare(connection);
-        } catch (IOException e) {
-            getLogger().debug("Closing stale connection and trying to create a 
new one for share " + getServiceLocation());
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.hostname = context.getProperty(HOSTNAME).getValue();
+        this.port = context.getProperty(PORT).asInteger();
+        this.shareName = context.getProperty(SHARE).getValue();
+        this.smbClient = buildSmbClient(context);
+        createAuthenticationContext(context);
+    }
 
-            closeConnection(connection);
-            unregisterHost();
+    @OnDisabled
+    public void onDisabled() {
+        smbClient.close();
+        smbClient = null;
+        hostname = null;
+        port = 0;
+        shareName = null;
+    }
 
-            connection = smbClient.connect(hostname, port);
-            return connectToShare(connection);
-        }
+    @Override
+    public URI getServiceLocation() {
+        return URI.create(String.format("smb://%s:%d/%s", hostname, port, 
shareName));
     }
 
-    private SmbjClientService connectToShare(final Connection connection) 
throws IOException {
+    @Override
+    public SmbClientService getClient() throws IOException {
+        final Connection connection = smbClient.connect(hostname, port);
+
         final Session session;
         final Share share;
 
@@ -164,20 +179,6 @@ public class SmbjClientProviderService extends 
AbstractControllerService impleme
         return new SmbjClientService(session, (DiskShare) share, 
getServiceLocation());
     }
 
-    private void unregisterHost() {
-        smbClient.getServerList().unregister(hostname);
-    }
-
-    private void closeConnection(final Connection connection) {
-        try {
-            if (connection != null) {
-                connection.close(true);
-            }
-        } catch (Exception e) {
-            getLogger().error("Could not close connection to {}", 
getServiceLocation(), e);
-        }
-    }
-
     private void closeSession(final Session session) {
         try {
             if (session != null) {
@@ -188,34 +189,6 @@ public class SmbjClientProviderService extends 
AbstractControllerService impleme
         }
     }
 
-    @Override
-    public URI getServiceLocation() {
-        return URI.create(String.format("smb://%s:%d/%s", hostname, port, 
shareName));
-    }
-
-    @OnEnabled
-    public void onEnabled(final ConfigurationContext context) {
-        this.hostname = context.getProperty(HOSTNAME).getValue();
-        this.port = context.getProperty(PORT).asInteger();
-        this.shareName = context.getProperty(SHARE).getValue();
-        this.smbClient = buildSmbClient(context);
-        createAuthenticationContext(context);
-    }
-
-    @OnDisabled
-    public void onDisabled() {
-        smbClient.close();
-        smbClient = null;
-        hostname = null;
-        port = 0;
-        shareName = null;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return PROPERTIES;
-    }
-
     private void createAuthenticationContext(final ConfigurationContext 
context) {
         if (context.getProperty(USERNAME).isSet()) {
             final String userName = context.getProperty(USERNAME).getValue();
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
similarity index 99%
rename from 
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
rename to 
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
index a030abc363..59278e54c5 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
@@ -54,7 +54,7 @@ import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.images.builder.Transferable;
 import org.testcontainers.utility.DockerImageName;
 
-public class NiFiSmbjClientIT {
+public class SmbjClientServiceIT {
 
     private final static Logger sambaContainerLogger = 
LoggerFactory.getLogger("sambaContainer");
     private final static Logger toxyProxyLogger = 
LoggerFactory.getLogger("toxiProxy");
@@ -62,7 +62,7 @@ public class NiFiSmbjClientIT {
     private final Network network = Network.newNetwork();
 
     private final GenericContainer<?> sambaContainer = new 
GenericContainer<>(DockerImageName.parse("dperson/samba"))
-            .withExposedPorts(139, 445)
+            .withExposedPorts(445)
             .waitingFor(Wait.forListeningPort())
             .withLogConsumer(new Slf4jLogConsumer(sambaContainerLogger))
             .withNetwork(network)
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
similarity index 92%
rename from 
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
rename to 
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
index f00b505bea..dba330fbc8 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
@@ -28,7 +28,11 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
-class NiFiSmbjClientTest {
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class SmbjClientServiceTest {
 
     @Mock
     Session session;
@@ -60,4 +64,4 @@ class NiFiSmbjClientTest {
 
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java
new file mode 100644
index 0000000000..92c173c10e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.smb.common;
+
+import com.hierynomus.mssmb2.messages.SMB2Echo;
+import com.hierynomus.smbj.SMBClient;
+import com.hierynomus.smbj.SmbConfig;
+import com.hierynomus.smbj.connection.Connection;
+import com.hierynomus.smbj.event.ConnectionClosed;
+import com.hierynomus.smbj.event.SMBEventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extends {@link com.hierynomus.smbj.SMBClient} with connection health check.
+ * <br/>
+ * Workaround to https://github.com/hierynomus/smbj/issues/796.
+ * <br/><br/>
+ * Health check method:
+ * <ul>
+ *   <li>get connection from the parent class</li>
+ *   <li>if it is a newly created connection, then return it</li>
+ *   <li>if it is an old connection, send ECHO message to the server
+ *     <ul>
+ *       <li>if ECHO succeeds, return the connection</li>
+ *       <li>if ECHO fails, unregister the connection, get connection again 
(which creates a new one) and return it</li>
+ *     </ul>
+ *   </li>
+ * </ul>
+ */
+class SmbClient extends SMBClient {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SmbClient.class);
+
+    private SMBEventBus bus;
+
+    private SmbClient(final SmbConfig config, final SMBEventBus bus) {
+        super(config, bus);
+    }
+
+    static SmbClient create(final SmbConfig config) {
+        final SMBEventBus bus = new SMBEventBus();
+
+        final SmbClient client = new SmbClient(config, bus);
+
+        client.bus = bus;
+
+        return client;
+    }
+
+    public Connection connect(final String hostname) throws IOException {
+        return connect(hostname, DEFAULT_PORT);
+    }
+
+    public synchronized Connection connect(final String hostname, final int 
port) throws IOException {
+        final Connection connection = super.connect(hostname, port);
+
+        try {
+            // SMB2 ECHO message can only be sent if this is not a new 
connection (and health check is only needed in this case)
+            if (!connection.release()) {
+                connection.send(new 
SMB2Echo(connection.getNegotiatedProtocol().getDialect())).get(10, 
TimeUnit.SECONDS);
+            }
+
+            // set lease counter back
+            connection.lease();
+
+            return connection;
+        } catch (Exception e) {
+            LOGGER.info("Stale connection found, unregistering it and creating 
a new one");
+            bus.publish(new ConnectionClosed(hostname, port));
+        }
+
+        return super.connect(hostname, port);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java
index 5b474fcc94..b79d68790f 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java
@@ -44,6 +44,15 @@ public class SmbProperties {
             .defaultValue("false")
             .build();
 
+    public static final PropertyDescriptor ENABLE_DFS = new 
PropertyDescriptor.Builder()
+            .name("enable-dfs")
+            .displayName("Enable DFS")
+            .description("Enables accessing Distributed File System (DFS) and 
following DFS links during SMB operations.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
     public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
             .displayName("Timeout")
             .name("timeout")
diff --git 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java
 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java
index 0895abfae0..b705c5c38a 100644
--- 
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java
+++ 
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java
@@ -21,6 +21,7 @@ import com.hierynomus.smbj.SmbConfig;
 import org.apache.nifi.context.PropertyContext;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
 import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
 import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
 import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@@ -32,7 +33,7 @@ public final class SmbUtils {
     }
 
     public static SMBClient buildSmbClient(final PropertyContext context) {
-        return new SMBClient(buildSmbConfig(context));
+        return SmbClient.create(buildSmbConfig(context));
     }
 
     static SmbConfig buildSmbConfig(final PropertyContext context) {
@@ -50,6 +51,10 @@ public final class SmbUtils {
             
configBuilder.withEncryptData(context.getProperty(USE_ENCRYPTION).asBoolean());
         }
 
+        if (context.getProperty(ENABLE_DFS).isSet()) {
+            
configBuilder.withDfsEnabled(context.getProperty(ENABLE_DFS).asBoolean());
+        }
+
         if (context.getProperty(TIMEOUT).isSet()) {
             
configBuilder.withTimeout(context.getProperty(TIMEOUT).asTimePeriod(MILLISECONDS),
 MILLISECONDS);
         }

Reply via email to