This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 3f160d4b99 DRILL-8407: Add Support for SFTP File Systems (#2770)
3f160d4b99 is described below
commit 3f160d4b9906b0d1056cd20a3c74ba2b0feb1f61
Author: Charles S. Givre <[email protected]>
AuthorDate: Mon Mar 6 08:07:28 2023 -0500
DRILL-8407: Add Support for SFTP File Systems (#2770)
---
.../drill/exec/store/dfs/FileSystemPlugin.java | 38 +++++++++++++++++++++-
1 file changed, 37 insertions(+), 1 deletion(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 0f0f353d1c..3a9260c5ba 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
@@ -61,6 +62,7 @@ import
org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.sftp.SFTPFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,6 +107,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin
{
fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getConnection());
fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
fsConf.set("fs.dropbox.impl", DropboxFileSystem.class.getName());
+ fsConf.set("fs.sftp.impl", SFTPFileSystem.class.getName());
fsConf.set("fs.box.impl", BoxFileSystem.class.getName());
fsConf.set("fs.drill-local.impl",
LocalSyncableFileSystem.class.getName());
CredentialsProvider credentialsProvider =
config.getCredentialsProvider();
@@ -116,6 +119,8 @@ public class FileSystemPlugin extends AbstractStoragePlugin
{
if (isS3Connection(fsConf)) {
handleS3Credentials(fsConf);
+ } else if (isSFTP(fsConf) && config.getAuthMode() !=
AuthMode.USER_TRANSLATION) {
+ handleSFTPCredentials(credentialsProvider);
} else if (config.oAuthConfig() != null && config.getAuthMode() ==
AuthMode.SHARED_USER) {
initializeOauthTokenTable(null);
}
@@ -177,6 +182,11 @@ public class FileSystemPlugin extends
AbstractStoragePlugin {
return uri.getScheme().equals("s3a");
}
+ private boolean isSFTP(Configuration conf) {
+ URI uri = FileSystem.getDefaultUri(conf);
+ return uri.getScheme().equals("sftp");
+ }
+
/**
* Retrieve secret and access keys from configured (with
* {@link
org.apache.hadoop.security.alias.CredentialProviderFactory#CREDENTIAL_PROVIDER_PATH}
property)
@@ -199,6 +209,27 @@ public class FileSystemPlugin extends
AbstractStoragePlugin {
}
}
+ private void handleSFTPCredentials(CredentialsProvider credentialsProvider) {
+ handleSFTPCredentials(credentialsProvider, null);
+ }
+
+ private void handleSFTPCredentials(CredentialsProvider credentialsProvider,
String username) {
+ String[] credentialKeys = {"fs.s3a.secret.key", "fs.s3a.access.key"};
+ Map<String, String> creds;
+ if (credentialsProvider != null) {
+ // Get credentials from credential provider if present
+ URI uri = FileSystem.getDefaultUri(fsConf);
+ if (StringUtils.isEmpty(username)) {
+ creds = credentialsProvider.getCredentials();
+ } else {
+ // Handle user translation
+ creds = credentialsProvider.getUserCredentials(username);
+ }
+ fsConf.set(SFTPFileSystem.FS_SFTP_USER_PREFIX + uri.getHost(),
creds.get("username"));
+ fsConf.set(SFTPFileSystem.FS_SFTP_PASSWORD_PREFIX + uri.getHost() + "."
+ creds.get("username"), creds.get("password"));
+ }
+ }
+
@VisibleForTesting
public void initializeOauthTokenTable(String username) {
OAuthTokenProvider tokenProvider = context.getOauthTokenProvider();
@@ -285,7 +316,12 @@ public class FileSystemPlugin extends
AbstractStoragePlugin {
// active username in the constructor. Removing it from the constructor
makes
// it difficult to test, so we do the check and leave it in both places.
if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
- initializeOauthTokenTable(schemaConfig.getUserName());
+ // If the file system uses OAuth, populate the OAuth tokens
+ if (config.oAuthConfig() != null) {
+ initializeOauthTokenTable(schemaConfig.getUserName());
+ } else if (isSFTP(fsConf)) {
+ handleSFTPCredentials(config.getCredentialsProvider(),
schemaConfig.getUserName());
+ }
}
schemaFactory.registerSchemas(schemaConfig, parent);
}