This is an automated email from the ASF dual-hosted git repository. cgivre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 5d6571a046 DRILL-8386: Add Support for User Translation for Cassandra (#2738) 5d6571a046 is described below commit 5d6571a046efa9238de2116ec51b07655d5fde2a Author: Charles S. Givre <cgi...@apache.org> AuthorDate: Thu Jan 12 10:28:43 2023 -0500 DRILL-8386: Add Support for User Translation for Cassandra (#2738) --- contrib/storage-cassandra/README.md | 9 +- .../store/cassandra/CassandraStorageConfig.java | 59 +++++++++++- .../store/cassandra/CassandraStoragePlugin.java | 19 ++++ .../schema/CassandraRootDrillSchemaFactory.java | 19 +++- .../exec/store/cassandra/BaseCassandraTest.java | 32 ++++++- .../cassandra/CassandraUserTranslationTest.java | 103 +++++++++++++++++++++ .../exec/store/cassandra/TestCassandraSuite.java | 2 +- contrib/storage-splunk/README.md | 4 + 8 files changed, 234 insertions(+), 13 deletions(-) diff --git a/contrib/storage-cassandra/README.md b/contrib/storage-cassandra/README.md index 07d58abe70..efd2d1c519 100644 --- a/contrib/storage-cassandra/README.md +++ b/contrib/storage-cassandra/README.md @@ -5,7 +5,7 @@ This storage plugin implementation is based on [Apache Calcite adapter for Cassa This storage plugin may be used for querying Scylla DB. -### Supported optimizations and features +## Supported Optimizations and Features This storage plugin supports the following optimizations: @@ -16,7 +16,7 @@ This storage plugin supports the following optimizations: Except for these optimizations, Cassandra storage plugin supports the schema provisioning feature. For more details please refer to [Specifying the Schema as Table Function Parameter](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter). -### Plugin registration +## Plugin Registration The plugin can be registered in Apache Drill using the drill web interface by navigating to the `storage` page. Following is the default registration configuration. @@ -32,7 +32,10 @@ Following is the default registration configuration. } ``` -### Developer notes +### User Translation +The Cassandra plugin supports user translation, which allows each user to authenticate using their own credentials instead of using system-wide credentials. Simply set the `authMode` parameter to `USER_TRANSLATION` and use either the plain or vault credential provider for credentials. + +## Developer Notes Most of the common classes required for creating storage plugins based on Calcite adapters are placed in the `java-exec` module, so they can be reused in future plugin implementations. diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java index ca1fc1c0cf..dddcbbc3b9 100644 --- a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java +++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStorageConfig.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.PlanStringBuilder; import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.logical.security.PlainCredentialsProvider; import org.apache.drill.exec.store.security.CredentialProviderUtils; import org.apache.drill.common.logical.security.CredentialsProvider; import org.apache.drill.exec.store.security.UsernamePasswordCredentials; @@ -35,7 +37,6 @@ import java.util.Optional; @JsonTypeName(CassandraStorageConfig.NAME) public class CassandraStorageConfig extends StoragePluginConfig { public static final String NAME = "cassandra"; - private final String host; private final int port; @@ -45,13 +46,20 @@ public class CassandraStorageConfig extends StoragePluginConfig { @JsonProperty("port") int port, @JsonProperty("username") String username, @JsonProperty("password") String password, + @JsonProperty("authMode") String authMode, @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) { super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), - credentialsProvider == null); + credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER)); this.host = host; this.port = port; } + private CassandraStorageConfig(CassandraStorageConfig that, CredentialsProvider credentialsProvider) { + super(getCredentialsProvider(credentialsProvider), credentialsProvider == null, that.authMode); + this.host = that.host; + this.port = that.port; + } + public String getHost() { return host; } @@ -79,6 +87,37 @@ public class CassandraStorageConfig extends StoragePluginConfig { .orElse(null); } + @JsonIgnore + public Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(String username) { + return new UsernamePasswordCredentials.Builder() + .setCredentialsProvider(credentialsProvider) + .setQueryUser(username) + .build(); + } + + @JsonIgnore + public Map<String, Object> toConfigMap(String username) { + Optional<UsernamePasswordCredentials> credentials = getUsernamePasswordCredentials(username); + + Map<String, Object> result = new HashMap<>(); + result.put("host", host); + result.put("port", port); + if (credentials.isPresent()) { + result.put("username", credentials.get().getUsername()); + result.put("password", credentials.get().getPassword()); + } + return result; + } + + @Override + public CassandraStorageConfig updateCredentialProvider(CredentialsProvider credentialsProvider) { + return new CassandraStorageConfig(this, credentialsProvider); + } + + private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) { + return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER; + } + @JsonIgnore public Map<String, Object> toConfigMap() { Optional<UsernamePasswordCredentials> credentials = getUsernamePasswordCredentials(); @@ -102,12 +141,22 @@ public class CassandraStorageConfig extends StoragePluginConfig { return false; } CassandraStorageConfig that = (CassandraStorageConfig) o; - return Objects.equals(host, that.host) - && Objects.equals(credentialsProvider, that.credentialsProvider); + return Objects.equals(host, that.host) && + Objects.equals(port, that.port) && + Objects.equals(credentialsProvider, that.credentialsProvider); } @Override public int hashCode() { - return Objects.hash(host, credentialsProvider); + return Objects.hash(host, port, credentialsProvider); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("host", host) + .field("port", port) + .field("credentialsProvider", credentialsProvider) + .toString(); } } diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java index d5f1a8a48c..b55f956d3c 100644 --- a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java +++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.cassandra; import org.apache.calcite.adapter.cassandra.CalciteUtils; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.schema.SchemaPlus; +import org.apache.drill.common.logical.StoragePluginConfig.AuthMode; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.planner.PlannerPhase; import org.apache.drill.exec.server.DrillbitContext; @@ -27,13 +28,18 @@ import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.SchemaFactory; import org.apache.drill.exec.store.cassandra.schema.CassandraRootDrillSchemaFactory; +import org.apache.drill.exec.store.security.UsernamePasswordCredentials; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Optional; import java.util.Set; public class CassandraStoragePlugin extends AbstractStoragePlugin { + private static final Logger logger = LoggerFactory.getLogger(CassandraStoragePlugin.class); private final CassandraStorageConfig config; private final SchemaFactory schemaFactory; @@ -46,6 +52,19 @@ public class CassandraStoragePlugin extends AbstractStoragePlugin { @Override public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { + // Check to see if user translation is enabled. If so, and creds are + // not present, then do not register any schemata. This prevents + // info schema errors. + if (config.getAuthMode() == AuthMode.USER_TRANSLATION) { + Optional<UsernamePasswordCredentials> userCreds = config.getUsernamePasswordCredentials(schemaConfig.getUserName()); + if (! userCreds.isPresent()) { + logger.debug( + "No schemas will be registered in {} for query user {}.", + getName(), schemaConfig.getUserName() + ); + return; + } + } schemaFactory.registerSchemas(schemaConfig, parent); } diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java index edcf72bf88..26bf180a0d 100644 --- a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java +++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/schema/CassandraRootDrillSchemaFactory.java @@ -21,12 +21,17 @@ import org.apache.calcite.adapter.cassandra.CassandraSchemaFactory; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaFactory; import org.apache.calcite.schema.SchemaPlus; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.logical.StoragePluginConfig.AuthMode; import org.apache.drill.exec.store.AbstractSchemaFactory; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.cassandra.CassandraStoragePlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CassandraRootDrillSchemaFactory extends AbstractSchemaFactory { + private static final Logger logger = LoggerFactory.getLogger(CassandraRootDrillSchemaFactory.class); private final CassandraStoragePlugin plugin; private final SchemaFactory calciteSchemaFactory; @@ -38,8 +43,18 @@ public class CassandraRootDrillSchemaFactory extends AbstractSchemaFactory { @Override public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) { - Schema schema = new CassandraRootDrillSchema(getName(), plugin, - calciteSchemaFactory, parent, getName(), plugin.getConfig().toConfigMap()); + Schema schema; + if (plugin.getConfig().getAuthMode() == AuthMode.SHARED_USER) { + schema = new CassandraRootDrillSchema(getName(), plugin, + calciteSchemaFactory, parent, getName(), plugin.getConfig().toConfigMap()); + } else if (plugin.getConfig().getAuthMode() == AuthMode.USER_TRANSLATION) { + schema = new CassandraRootDrillSchema(getName(), plugin, + calciteSchemaFactory, parent, getName(), plugin.getConfig().toConfigMap(schemaConfig.getUserName())); + } else { + throw UserException.internalError() + .message("Cassandra only supports SHARED_USER and USER_TRANSLATION authentication.") + .build(logger); + } parent.add(getName(), schema); } } diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java index faae8470bc..0fdfdc7620 100644 --- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java +++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/BaseCassandraTest.java @@ -17,12 +17,20 @@ */ package org.apache.drill.exec.store.cassandra; -import org.apache.drill.test.ClusterFixture; +import org.apache.drill.common.logical.StoragePluginConfig.AuthMode; +import org.apache.drill.common.logical.security.PlainCredentialsProvider; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.test.ClusterTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.testcontainers.containers.CassandraContainer; +import java.util.HashMap; + +import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1; +import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2; + public class BaseCassandraTest extends ClusterTest { @BeforeClass @@ -32,16 +40,36 @@ public class BaseCassandraTest extends ClusterTest { } private static void initCassandraPlugin(CassandraContainer<?> cassandra) throws Exception { - startCluster(ClusterFixture.builder(dirTestWatcher)); + ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher) + .configProperty(ExecConstants.HTTP_ENABLE, true) + .configProperty(ExecConstants.HTTP_PORT_HUNT, true) + .configProperty(ExecConstants.IMPERSONATION_ENABLED, true); + startCluster(builder); CassandraStorageConfig config = new CassandraStorageConfig( cassandra.getHost(), cassandra.getMappedPort(CassandraContainer.CQL_PORT), cassandra.getUsername(), cassandra.getPassword(), + AuthMode.SHARED_USER.name(), null); config.setEnabled(true); cluster.defineStoragePlugin("cassandra", config); + + PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(new HashMap<>()); + // Add authorized user + credentialsProvider.setUserCredentials(cassandra.getUsername(), cassandra.getPassword(), TEST_USER_1); + // Add unauthorized user + credentialsProvider.setUserCredentials("nope", "no way dude", TEST_USER_2); + + CassandraStorageConfig ut_config = new CassandraStorageConfig( + cassandra.getHost(), + cassandra.getMappedPort(CassandraContainer.CQL_PORT), + null, null, + AuthMode.USER_TRANSLATION.name(), + credentialsProvider); + ut_config.setEnabled(true); + cluster.defineStoragePlugin("ut_cassandra", ut_config); } @AfterClass diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraUserTranslationTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraUserTranslationTest.java new file mode 100644 index 0000000000..f52fa54d87 --- /dev/null +++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/CassandraUserTranslationTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.cassandra; + +import org.apache.drill.categories.SlowTest; +import org.apache.drill.common.config.DrillProperties; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.test.ClientFixture; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER; +import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER_PASSWORD; +import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1; +import static org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@Category({SlowTest.class}) +public class CassandraUserTranslationTest extends BaseCassandraTest { + @Test + public void testInfoSchemaQueryWithMissingCredentials() throws Exception { + // This test validates that the correct credentials are sent down to Cassandra. + // This user should not see the ut_cassandra because they do not have valid credentials. + ClientFixture client = cluster + .clientBuilder() + .property(DrillProperties.USER, ADMIN_USER) + .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD) + .build(); + + String sql = "SHOW DATABASES WHERE schema_name LIKE '%cassandra%'"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + assertEquals(1, results.rowCount()); + results.clear(); + } + + @Test + public void testInfoSchemaQueryWithValidCredentials() throws Exception { + // This test validates that the cassandra connection with user translation appears whne the user is + // authenticated. + ClientFixture client = cluster + .clientBuilder() + .property(DrillProperties.USER, TEST_USER_1) + .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD) + .build(); + + String sql = "SHOW DATABASES WHERE schema_name LIKE '%cassandra%'"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + assertEquals(2, results.rowCount()); + results.clear(); + } + + @Test + public void testSplunkQueryWithUserTranslation() throws Exception { + ClientFixture client = cluster + .clientBuilder() + .property(DrillProperties.USER, TEST_USER_1) + .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD) + .build(); + + String sql = "select * from ut_cassandra.test_keyspace.`employee` order by employee_id"; + RowSet results = client.queryBuilder().sql(sql).rowSet(); + assertEquals(10, results.rowCount()); + results.clear(); + } + + @Test + public void testSplunkQueryWithUserTranslationAndInvalidCredentials() throws Exception { + ClientFixture client = cluster + .clientBuilder() + .property(DrillProperties.USER, ADMIN_USER) + .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD) + .build(); + + String sql = "select * from ut_cassandra.test_keyspace.`employee` order by employee_id"; + try { + client.queryBuilder().sql(sql).rowSet(); + fail(); + } catch (UserRemoteException e) { + assertTrue(e.getMessage().contains("Schema [[ut_cassandra, test_keyspace]] is not valid")); + } + } +} diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java index 98ead70cec..1adc023b80 100644 --- a/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java +++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/exec/store/cassandra/TestCassandraSuite.java @@ -31,7 +31,7 @@ import org.testcontainers.containers.CassandraContainer; @Category(SlowTest.class) @RunWith(Suite.class) -@Suite.SuiteClasses({CassandraComplexTypesTest.class, CassandraPlanTest.class, CassandraQueryTest.class}) +@Suite.SuiteClasses({CassandraComplexTypesTest.class, CassandraPlanTest.class, CassandraQueryTest.class, CassandraUserTranslationTest.class}) public class TestCassandraSuite extends BaseTest { protected static CassandraContainer<?> cassandra; diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md index 0fe032aa01..29e1b42673 100644 --- a/contrib/storage-splunk/README.md +++ b/contrib/storage-splunk/README.md @@ -42,6 +42,10 @@ Sometimes Splunk has issue in connection to it: https://github.com/splunk/splunk-sdk-java/issues/62 <br> To bypass it by Drill please specify "reconnectRetries": 3. It allows you to retry the connection several times. +### User Translation +The Splunk plugin supports user translation. Simply set the `authMode` parameter to `USER_TRANSLATION` and use either the plain or vault credential provider for credentials. + + ## Understanding Splunk's Data Model Splunk's primary use case is analyzing event logs with a timestamp. As such, data is indexed by the timestamp, with the most recent data being indexed first. By default, Splunk will sort the data in reverse chronological order. Large Splunk installations will put older data into buckets of hot, warm and cold storage with the "cold" storage on the