This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d6571a046 DRILL-8386: Add Support for User Translation for Cassandra 
(#2738)
5d6571a046 is described below

commit 5d6571a046efa9238de2116ec51b07655d5fde2a
Author: Charles S. Givre <cgi...@apache.org>
AuthorDate: Thu Jan 12 10:28:43 2023 -0500

    DRILL-8386: Add Support for User Translation for Cassandra (#2738)
---
 contrib/storage-cassandra/README.md                |   9 +-
 .../store/cassandra/CassandraStorageConfig.java    |  59 +++++++++++-
 .../store/cassandra/CassandraStoragePlugin.java    |  19 ++++
 .../schema/CassandraRootDrillSchemaFactory.java    |  19 +++-
 .../exec/store/cassandra/BaseCassandraTest.java    |  32 ++++++-
 .../cassandra/CassandraUserTranslationTest.java    | 103 +++++++++++++++++++++
 .../exec/store/cassandra/TestCassandraSuite.java   |   2 +-
 contrib/storage-splunk/README.md                   |   4 +
 8 files changed, 234 insertions(+), 13 deletions(-)

diff --git a/contrib/storage-cassandra/README.md 
b/contrib/storage-cassandra/README.md
index 07d58abe70..efd2d1c519 100644
--- a/contrib/storage-cassandra/README.md
+++ b/contrib/storage-cassandra/README.md
@@ -5,7 +5,7 @@ This storage plugin implementation is based on [Apache Calcite 
adapter for Cassa
 
 This storage plugin may be used for querying Scylla DB.
 
-### Supported optimizations and features
+## Supported Optimizations and Features
 
 This storage plugin supports the following optimizations:
 
@@ -16,7 +16,7 @@ This storage plugin supports the following optimizations:
 Except for these optimizations, Cassandra storage plugin supports the schema 
provisioning feature.
 For more details please refer to [Specifying the Schema as Table Function 
Parameter](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter).
 
-### Plugin registration
+## Plugin Registration
 
 The plugin can be registered in Apache Drill using the drill web interface by 
navigating to the `storage` page.
 Following is the default registration configuration.
@@ -32,7 +32,10 @@ Following is the default registration configuration.
 }
 ```
 
-### Developer notes
+### User Translation
+The Cassandra plugin supports user translation, which allows each user to 
authenticate using their own credentials instead of using system-wide 
credentials.  Simply set the `authMode` parameter to `USER_TRANSLATION` and use 
either the plain or vault credential provider for credentials.
+
+## Developer Notes
 
 Most of the common classes required for creating storage plugins based on 
Calcite adapters are placed in the 
 `java-exec` module, so they can be reused in future plugin implementations.
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
index ca1fc1c0cf..dddcbbc3b9 100644
--- 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java
@@ -22,7 +22,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.exec.store.security.CredentialProviderUtils;
 import org.apache.drill.common.logical.security.CredentialsProvider;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -35,7 +37,6 @@ import java.util.Optional;
 @JsonTypeName(CassandraStorageConfig.NAME)
 public class CassandraStorageConfig extends StoragePluginConfig {
   public static final String NAME = "cassandra";
-
   private final String host;
   private final int port;
 
@@ -45,13 +46,20 @@ public class CassandraStorageConfig extends 
StoragePluginConfig {
       @JsonProperty("port") int port,
       @JsonProperty("username") String username,
       @JsonProperty("password") String password,
+      @JsonProperty("authMode") String authMode,
       @JsonProperty("credentialsProvider") CredentialsProvider 
credentialsProvider) {
     super(CredentialProviderUtils.getCredentialsProvider(username, password, 
credentialsProvider),
-        credentialsProvider == null);
+        credentialsProvider == null, AuthMode.parseOrDefault(authMode, 
AuthMode.SHARED_USER));
     this.host = host;
     this.port = port;
   }
 
+  private CassandraStorageConfig(CassandraStorageConfig that, 
CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(credentialsProvider), credentialsProvider == 
null, that.authMode);
+    this.host = that.host;
+    this.port = that.port;
+  }
+
   public String getHost() {
     return host;
   }
@@ -79,6 +87,37 @@ public class CassandraStorageConfig extends 
StoragePluginConfig {
       .orElse(null);
   }
 
+  @JsonIgnore
+  public Optional<UsernamePasswordCredentials> 
getUsernamePasswordCredentials(String username) {
+    return new UsernamePasswordCredentials.Builder()
+        .setCredentialsProvider(credentialsProvider)
+        .setQueryUser(username)
+        .build();
+  }
+
+  @JsonIgnore
+  public Map<String, Object> toConfigMap(String username) {
+    Optional<UsernamePasswordCredentials> credentials = 
getUsernamePasswordCredentials(username);
+
+    Map<String, Object> result = new HashMap<>();
+    result.put("host", host);
+    result.put("port", port);
+    if (credentials.isPresent()) {
+      result.put("username", credentials.get().getUsername());
+      result.put("password", credentials.get().getPassword());
+    }
+    return result;
+  }
+
+  @Override
+  public CassandraStorageConfig updateCredentialProvider(CredentialsProvider 
credentialsProvider) {
+    return new CassandraStorageConfig(this, credentialsProvider);
+  }
+
+  private static CredentialsProvider 
getCredentialsProvider(CredentialsProvider credentialsProvider) {
+    return credentialsProvider != null ? credentialsProvider : 
PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+  }
+
   @JsonIgnore
   public Map<String, Object> toConfigMap() {
     Optional<UsernamePasswordCredentials> credentials = 
getUsernamePasswordCredentials();
@@ -102,12 +141,22 @@ public class CassandraStorageConfig extends 
StoragePluginConfig {
       return false;
     }
     CassandraStorageConfig that = (CassandraStorageConfig) o;
-    return Objects.equals(host, that.host)
-        && Objects.equals(credentialsProvider, that.credentialsProvider);
+    return Objects.equals(host, that.host) &&
+        Objects.equals(port, that.port) &&
+        Objects.equals(credentialsProvider, that.credentialsProvider);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(host, credentialsProvider);
+    return Objects.hash(host, port, credentialsProvider);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+        .field("host", host)
+        .field("port", port)
+        .field("credentialsProvider", credentialsProvider)
+        .toString();
   }
 }
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
index d5f1a8a48c..b55f956d3c 100644
--- 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.cassandra;
 import org.apache.calcite.adapter.cassandra.CalciteUtils;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -27,13 +28,18 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import 
org.apache.drill.exec.store.cassandra.schema.CassandraRootDrillSchemaFactory;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.Set;
 
 public class CassandraStoragePlugin extends AbstractStoragePlugin {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(CassandraStoragePlugin.class);
   private final CassandraStorageConfig config;
   private final SchemaFactory schemaFactory;
 
@@ -46,6 +52,19 @@ public class CassandraStoragePlugin extends 
AbstractStoragePlugin {
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+    // Check to see if user translation is enabled.  If so, and creds are
+    // not present, then do not register any schemata.  This prevents
+    // info schema errors.
+    if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+      Optional<UsernamePasswordCredentials> userCreds = 
config.getUsernamePasswordCredentials(schemaConfig.getUserName());
+      if (! userCreds.isPresent()) {
+        logger.debug(
+            "No schemas will be registered in {} for query user {}.",
+            getName(), schemaConfig.getUserName()
+        );
+        return;
+      }
+    }
     schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
diff --git 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
index edcf72bf88..26bf180a0d 100644
--- 
a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
+++ 
b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java
@@ -21,12 +21,17 @@ import 
org.apache.calcite.adapter.cassandra.CassandraSchemaFactory;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaFactory;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.cassandra.CassandraStoragePlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CassandraRootDrillSchemaFactory extends AbstractSchemaFactory {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(CassandraRootDrillSchemaFactory.class);
   private final CassandraStoragePlugin plugin;
   private final SchemaFactory calciteSchemaFactory;
 
@@ -38,8 +43,18 @@ public class CassandraRootDrillSchemaFactory extends 
AbstractSchemaFactory {
 
   @Override
   public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
-    Schema schema = new CassandraRootDrillSchema(getName(), plugin,
-        calciteSchemaFactory, parent, getName(), 
plugin.getConfig().toConfigMap());
+    Schema schema;
+    if (plugin.getConfig().getAuthMode() == AuthMode.SHARED_USER) {
+      schema = new CassandraRootDrillSchema(getName(), plugin,
+          calciteSchemaFactory, parent, getName(), 
plugin.getConfig().toConfigMap());
+    } else if (plugin.getConfig().getAuthMode() == AuthMode.USER_TRANSLATION) {
+      schema = new CassandraRootDrillSchema(getName(), plugin,
+          calciteSchemaFactory, parent, getName(), 
plugin.getConfig().toConfigMap(schemaConfig.getUserName()));
+    } else {
+      throw UserException.internalError()
+          .message("Cassandra only supports SHARED_USER and USER_TRANSLATION 
authentication.")
+          .build(logger);
+    }
     parent.add(getName(), schema);
   }
 }
diff --git 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
index faae8470bc..0fdfdc7620 100644
--- 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
+++ 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java
@@ -17,12 +17,20 @@
  */
 package org.apache.drill.exec.store.cassandra;
 
-import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.testcontainers.containers.CassandraContainer;
 
+import java.util.HashMap;
+
+import static 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+
 public class BaseCassandraTest extends ClusterTest {
 
   @BeforeClass
@@ -32,16 +40,36 @@ public class BaseCassandraTest extends ClusterTest {
   }
 
   private static void initCassandraPlugin(CassandraContainer<?> cassandra) 
throws Exception {
-    startCluster(ClusterFixture.builder(dirTestWatcher));
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+        .configProperty(ExecConstants.HTTP_ENABLE, true)
+        .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+        .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+    startCluster(builder);
 
     CassandraStorageConfig config = new CassandraStorageConfig(
         cassandra.getHost(),
         cassandra.getMappedPort(CassandraContainer.CQL_PORT),
         cassandra.getUsername(),
         cassandra.getPassword(),
+        AuthMode.SHARED_USER.name(),
         null);
     config.setEnabled(true);
     cluster.defineStoragePlugin("cassandra", config);
+
+    PlainCredentialsProvider credentialsProvider = new 
PlainCredentialsProvider(new HashMap<>());
+    // Add authorized user
+    credentialsProvider.setUserCredentials(cassandra.getUsername(), 
cassandra.getPassword(), TEST_USER_1);
+    // Add unauthorized user
+    credentialsProvider.setUserCredentials("nope", "no way dude", TEST_USER_2);
+
+    CassandraStorageConfig ut_config = new CassandraStorageConfig(
+        cassandra.getHost(),
+        cassandra.getMappedPort(CassandraContainer.CQL_PORT),
+        null, null,
+        AuthMode.USER_TRANSLATION.name(),
+        credentialsProvider);
+    ut_config.setEnabled(true);
+    cluster.defineStoragePlugin("ut_cassandra", ut_config);
   }
 
   @AfterClass
diff --git 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraUserTranslationTest.java
 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraUserTranslationTest.java
new file mode 100644
index 0000000000..f52fa54d87
--- /dev/null
+++ 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraUserTranslationTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.cassandra;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER;
+import static 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER_PASSWORD;
+import static 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static 
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Category({SlowTest.class})
+public class CassandraUserTranslationTest extends BaseCassandraTest {
+  @Test
+  public void testInfoSchemaQueryWithMissingCredentials() throws Exception {
+    // This test validates that the correct credentials are sent down to 
Cassandra.
+    // This user should not see the ut_cassandra because they do not have 
valid credentials.
+    ClientFixture client = cluster
+        .clientBuilder()
+        .property(DrillProperties.USER, ADMIN_USER)
+        .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+        .build();
+
+    String sql = "SHOW DATABASES WHERE schema_name LIKE '%cassandra%'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(1, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testInfoSchemaQueryWithValidCredentials() throws Exception {
+    // This test validates that the cassandra connection with user translation 
appears whne the user is
+    // authenticated.
+    ClientFixture client = cluster
+        .clientBuilder()
+        .property(DrillProperties.USER, TEST_USER_1)
+        .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+        .build();
+
+    String sql = "SHOW DATABASES WHERE schema_name LIKE '%cassandra%'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(2, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testSplunkQueryWithUserTranslation() throws Exception {
+    ClientFixture client = cluster
+        .clientBuilder()
+        .property(DrillProperties.USER, TEST_USER_1)
+        .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+        .build();
+
+    String sql = "select * from ut_cassandra.test_keyspace.`employee` order by 
employee_id";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(10, results.rowCount());
+    results.clear();
+  }
+
+  @Test
+  public void testSplunkQueryWithUserTranslationAndInvalidCredentials() throws 
Exception {
+    ClientFixture client = cluster
+        .clientBuilder()
+        .property(DrillProperties.USER, ADMIN_USER)
+        .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+        .build();
+
+    String sql = "select * from ut_cassandra.test_keyspace.`employee` order by 
employee_id";
+    try {
+      client.queryBuilder().sql(sql).rowSet();
+      fail();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains("Schema [[ut_cassandra, 
test_keyspace]] is not valid"));
+    }
+  }
+}
diff --git 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
index 98ead70cec..1adc023b80 100644
--- 
a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
+++ 
b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java
@@ -31,7 +31,7 @@ import org.testcontainers.containers.CassandraContainer;
 
 @Category(SlowTest.class)
 @RunWith(Suite.class)
-@Suite.SuiteClasses({CassandraComplexTypesTest.class, CassandraPlanTest.class, 
CassandraQueryTest.class})
+@Suite.SuiteClasses({CassandraComplexTypesTest.class, CassandraPlanTest.class, 
CassandraQueryTest.class, CassandraUserTranslationTest.class})
 public class TestCassandraSuite extends BaseTest {
 
   protected static CassandraContainer<?> cassandra;
diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md
index 0fe032aa01..29e1b42673 100644
--- a/contrib/storage-splunk/README.md
+++ b/contrib/storage-splunk/README.md
@@ -42,6 +42,10 @@ Sometimes Splunk has issue in connection to it:
 https://github.com/splunk/splunk-sdk-java/issues/62 <br>
 To bypass it by Drill please specify "reconnectRetries": 3. It allows you to 
retry the connection several times.
 
+### User Translation
+The Splunk plugin supports user translation.  Simply set the `authMode` 
parameter to `USER_TRANSLATION` and use either the plain or vault credential 
provider for credentials.
+
+
 ## Understanding Splunk's Data Model
 Splunk's primary use case is analyzing event logs with a timestamp. As such, 
data is indexed by the timestamp, with the most recent data being indexed 
first.  By default, Splunk
  will sort the data in reverse chronological order.  Large Splunk 
installations will put older data into buckets of hot, warm and cold storage 
with the "cold" storage on the

Reply via email to