This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f160d4b99 DRILL-8407: Add Support for SFTP File Systems (#2770)
3f160d4b99 is described below

commit 3f160d4b9906b0d1056cd20a3c74ba2b0feb1f61
Author: Charles S. Givre <[email protected]>
AuthorDate: Mon Mar 6 08:07:28 2023 -0500

    DRILL-8407: Add Support for SFTP File Systems (#2770)
---
 .../drill/exec/store/dfs/FileSystemPlugin.java     | 38 +++++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 0f0f353d1c..3a9260c5ba 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -61,6 +62,7 @@ import 
org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.sftp.SFTPFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,6 +107,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin 
{
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getConnection());
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.dropbox.impl", DropboxFileSystem.class.getName());
+      fsConf.set("fs.sftp.impl", SFTPFileSystem.class.getName());
       fsConf.set("fs.box.impl", BoxFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", 
LocalSyncableFileSystem.class.getName());
       CredentialsProvider credentialsProvider = 
config.getCredentialsProvider();
@@ -116,6 +119,8 @@ public class FileSystemPlugin extends AbstractStoragePlugin 
{
 
       if (isS3Connection(fsConf)) {
         handleS3Credentials(fsConf);
+      } else if (isSFTP(fsConf) && config.getAuthMode() != 
AuthMode.USER_TRANSLATION) {
+        handleSFTPCredentials(credentialsProvider);
       } else if (config.oAuthConfig() != null && config.getAuthMode() == 
AuthMode.SHARED_USER) {
         initializeOauthTokenTable(null);
       }
@@ -177,6 +182,11 @@ public class FileSystemPlugin extends 
AbstractStoragePlugin {
     return uri.getScheme().equals("s3a");
   }
 
+  private boolean isSFTP(Configuration conf) {
+    URI uri = FileSystem.getDefaultUri(conf);
+    return uri.getScheme().equals("sftp");
+  }
+
   /**
    * Retrieve secret and access keys from configured (with
    * {@link 
org.apache.hadoop.security.alias.CredentialProviderFactory#CREDENTIAL_PROVIDER_PATH}
 property)
@@ -199,6 +209,27 @@ public class FileSystemPlugin extends 
AbstractStoragePlugin {
     }
   }
 
+  private void handleSFTPCredentials(CredentialsProvider credentialsProvider) {
+    handleSFTPCredentials(credentialsProvider, null);
+  }
+
+  private void handleSFTPCredentials(CredentialsProvider credentialsProvider, 
String username) {
+    String[] credentialKeys = {"fs.s3a.secret.key", "fs.s3a.access.key"};
+    Map<String, String> creds;
+    if (credentialsProvider != null) {
+      // Get credentials from credential provider if present
+      URI uri = FileSystem.getDefaultUri(fsConf);
+      if (StringUtils.isEmpty(username)) {
+        creds = credentialsProvider.getCredentials();
+      } else {
+        // Handle user translation
+        creds = credentialsProvider.getUserCredentials(username);
+      }
+      fsConf.set(SFTPFileSystem.FS_SFTP_USER_PREFIX + uri.getHost(), 
creds.get("username"));
+      fsConf.set(SFTPFileSystem.FS_SFTP_PASSWORD_PREFIX + uri.getHost() + "." 
+ creds.get("username"), creds.get("password"));
+    }
+  }
+
   @VisibleForTesting
   public void initializeOauthTokenTable(String username) {
     OAuthTokenProvider tokenProvider = context.getOauthTokenProvider();
@@ -285,7 +316,12 @@ public class FileSystemPlugin extends 
AbstractStoragePlugin {
     // active username in the constructor.  Removing it from the constructor 
makes
     // it difficult to test, so we do the check and leave it in both places.
     if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
-      initializeOauthTokenTable(schemaConfig.getUserName());
+      // If the file system uses OAuth, populate the OAuth tokens
+      if (config.oAuthConfig() != null) {
+        initializeOauthTokenTable(schemaConfig.getUserName());
+      } else if (isSFTP(fsConf)) {
+        handleSFTPCredentials(config.getCredentialsProvider(), 
schemaConfig.getUserName());
+      }
     }
     schemaFactory.registerSchemas(schemaConfig, parent);
   }

Reply via email to