mikewalch closed pull request #361: ACCUMULO-4784 - Create builder for Connector URL: https://github.com/apache/accumulo/pull/361
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/assemble/conf/client.conf b/assemble/conf/client.conf deleted file mode 100644 index 5256b13d12..0000000000 --- a/assemble/conf/client.conf +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# instance.zookeeper.host=localhost:2181 -# instance.rpc.ssl.enabled=false - -# instance.rcp.sasl.enabled=false -# rpc.sasl.qop=auth diff --git a/assemble/pom.xml b/assemble/pom.xml index 4167de3d83..9a9d94dc2d 100644 --- a/assemble/pom.xml +++ b/assemble/pom.xml @@ -384,6 +384,21 @@ <executable>${basedir}/src/main/scripts/generate-versions-listing.sh</executable> </configuration> </execution> + <execution> + <id>client-props-file</id> + <goals> + <goal>java</goal> + </goals> + <phase>prepare-package</phase> + <configuration> + <mainClass>org.apache.accumulo.core.conf.ClientConfigGenerate</mainClass> + <classpathScope>test</classpathScope> + <arguments> + <argument>--generate-config</argument> + <argument>${project.build.directory}/accumulo-client.properties</argument> + </arguments> + </configuration> + </execution> </executions> </plugin> <plugin> diff --git a/assemble/src/main/assemblies/component.xml b/assemble/src/main/assemblies/component.xml index 405b7b3449..98e3b9c583 100644 --- a/assemble/src/main/assemblies/component.xml +++ b/assemble/src/main/assemblies/component.xml @@ -117,6 +117,14 @@ <directoryMode>0755</directoryMode> <fileMode>0644</fileMode> </fileSet> + <fileSet> + <directory>target</directory> + <outputDirectory>conf</outputDirectory> + <fileMode>0644</fileMode> + <includes> + <include>accumulo-client.properties</include> + </includes> + </fileSet> <!-- Lift generated thrift proxy code into its own directory --> <fileSet> <directory>../proxy/target</directory> diff --git a/core/pom.xml b/core/pom.xml index 16fa16f3b9..fe89b6b5c0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -291,6 +291,21 @@ </arguments> </configuration> </execution> + <execution> + <id>client-props-markdown</id> + <goals> + <goal>java</goal> + </goals> + <phase>package</phase> + <configuration> + <mainClass>org.apache.accumulo.core.conf.ClientConfigGenerate</mainClass> + <classpathScope>test</classpathScope> + <arguments> + <argument>--generate-markdown</argument> + <argument>${project.build.directory}/generated-docs/client-properties.md</argument> + </arguments> + </configuration> + </execution> </executions> </plugin> </plugins> diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java index 521e0ce776..3da6459c27 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java +++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java @@ -50,6 +50,7 @@ private Integer maxWriteThreads = null; private Durability durability = Durability.DEFAULT; + private boolean isDurabilitySet = false; /** * Sets the maximum memory to batch before writing. The smaller this value, the more frequently the {@link BatchWriter} will write.<br> @@ -190,6 +191,7 @@ public Durability getDurability() { */ public BatchWriterConfig setDurability(Durability durability) { this.durability = durability; + isDurabilitySet = true; return this; } @@ -309,6 +311,34 @@ public boolean equals(Object o) { return false; } + private static <T> T merge(T o1, T o2) { + if (o1 != null) + return o1; + return o2; + } + + /** + * Merge this BatchWriterConfig with another. If config is set in both, preference will be given to this config. + * + * @param other + * Another BatchWriterConfig + * @return Merged BatchWriterConfig + * @since 2.0.0 + */ + public BatchWriterConfig merge(BatchWriterConfig other) { + BatchWriterConfig result = new BatchWriterConfig(); + result.maxMemory = merge(this.maxMemory, other.maxMemory); + result.maxLatency = merge(this.maxLatency, other.maxLatency); + result.timeout = merge(this.timeout, other.timeout); + result.maxWriteThreads = merge(this.maxWriteThreads, other.maxWriteThreads); + if (this.isDurabilitySet) { + result.durability = this.durability; + } else if (other.isDurabilitySet) { + result.durability = other.durability; + } + return result; + } + @Override public int hashCode() { HashCodeBuilder hcb = new HashCodeBuilder(); diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java index 6f0274ca07..a269229659 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java @@ -44,6 +44,7 @@ * Contains a list of property keys recognized by the Accumulo client and convenience methods for setting them. * * @since 1.6.0 + * @deprecated since 2.0.0, replaced {@link Connector#builder()} */ public class ClientConfiguration { private static final Logger log = LoggerFactory.getLogger(ClientConfiguration.class); diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java b/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java new file mode 100644 index 0000000000..9a43073a67 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client; + +import java.util.Properties; + +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; + +/** + * Accumulo client connection information. Can be built using {@link Connector#builder()} + * + * @since 2.0.0 + */ +public interface ConnectionInfo { + + /** + * @return Accumulo instance name + */ + String getInstanceName(); + + /** + * @return Zookeeper connection information for Accumulo instance + */ + String getZookeepers(); + + /** + * @return Accumulo principal/username + */ + String getPrincipal(); + + /** + * @return {@link AuthenticationToken} used for this connection + */ + AuthenticationToken getAuthenticationToken(); + + /** + * @return All Accumulo client properties set for this connection + */ + Properties getProperties(); +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java index e36cc82674..9ac7d713f5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java @@ -16,11 +16,15 @@ */ package org.apache.accumulo.core.client; +import java.util.Properties; + import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NamespaceOperations; import org.apache.accumulo.core.client.admin.ReplicationOperations; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.impl.ConnectorImpl; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.security.Authorizations; /** @@ -77,6 +81,7 @@ public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations int maxWriteThreads) throws TableNotFoundException; /** + * Factory method to create BatchDeleter * * @param tableName * the name of the table to query and delete from @@ -87,7 +92,8 @@ public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations * @param numQueryThreads * the number of concurrent threads to spawn for querying * @param config - * configuration used to create batch writer + * configuration used to create batch writer. This config takes precedence. Any unset values will be merged with config set when the Connector was + * created. If no config was set during Connector creation, BatchWriterConfig defaults will be used. * @return BatchDeleter object for configuring and deleting * @since 1.5.0 */ @@ -95,6 +101,24 @@ public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException; + /** + * Factory method to create BatchDeleter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig defaults will + * be used. + * + * @param tableName + * the name of the table to query and delete from + * @param authorizations + * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in + * must be a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are + * passed, then an exception will be thrown. + * @param numQueryThreads + * the number of concurrent threads to spawn for querying + * @return BatchDeleter object + * @throws TableNotFoundException + * if table not found + */ + public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException; + /** * Factory method to create a BatchWriter connected to Accumulo. * @@ -121,13 +145,27 @@ public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations * @param tableName * the name of the table to insert data into * @param config - * configuration used to create batch writer + * configuration used to create batch writer. This config will take precedence. Any unset values will merged with config set when the Connector was + * created. If no config was set during Connector creation, BatchWriterConfig defaults will be used. * @return BatchWriter object for configuring and writing data to * @since 1.5.0 */ public abstract BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException; + /** + * Factory method to create a BatchWriter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig defaults will + * be used. + * + * @param tableName + * the name of the table to insert data into + * @return BatchWriter object + * @throws TableNotFoundException + * if table not found + * @since 2.0.0 + */ + public abstract BatchWriter createBatchWriter(String tableName) throws TableNotFoundException; + /** * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables, which is good for * ingesting data into multiple tables from the same source @@ -150,13 +188,22 @@ public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations * multiple tables can be sent to a server in a single batch. Its an efficient way to ingest data into multiple tables from a single process. * * @param config - * configuration used to create multi-table batch writer + * configuration used to create multi-table batch writer. This config will take precedence. Any unset values will merged with config set when the + * Connector was created. If no config was set during Connector creation, BatchWriterConfig defaults will be used. * @return MultiTableBatchWriter object for configuring and writing data to * @since 1.5.0 */ - public abstract MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config); + /** + * Factory method to create a Multi-Table BatchWriter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig + * defaults will be used. + * + * @return MultiTableBatchWriter object + * @since 2.0.0 + */ + public abstract MultiTableBatchWriter createMultiTableBatchWriter(); + /** * Factory method to create a Scanner connected to Accumulo. * @@ -237,4 +284,272 @@ public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations * @since 1.7.0 */ public abstract ReplicationOperations replicationOperations(); + + /** + * Builds ConnectionInfo after all options have been specified + * + * @since 2.0.0 + */ + public interface ConnInfoFactory { + + /** + * Builds ConnectionInfo after all options have been specified + * + * @return ConnectionInfo + */ + ConnectionInfo info(); + } + + /** + * Builds Connector + * + * @since 2.0.0 + */ + public interface ConnectorFactory extends ConnInfoFactory { + + /** + * Builds Connector after all options have been specified + * + * @return Connector + */ + Connector build() throws AccumuloException, AccumuloSecurityException; + + } + + /** + * Builder method for setting Accumulo instance and zookeepers + * + * @since 2.0.0 + */ + public interface InstanceArgs { + AuthenticationArgs forInstance(String instanceName, String zookeepers); + } + + /** + * Builder methods for creating Connector using properties + * + * @since 2.0.0 + */ + public interface PropertyOptions extends InstanceArgs { + + /** + * Build using properties file. An example properties file can be found at conf/accumulo-client.properties in the Accumulo tarball distribution. + * + * @param propertiesFile + * Path to properties file + * @return this builder + */ + ConnectorFactory usingProperties(String propertiesFile); + + /** + * Build using Java properties object. A list of available properties can be found in the documentation on the project website (http://accumulo.apache.org) + * under 'Development' -> 'Client Properties' + * + * @param properties + * Properties object + * @return this builder + */ + ConnectorFactory usingProperties(Properties properties); + } + + public interface ConnectionInfoOptions extends PropertyOptions { + + /** + * Build using connection information + * + * @param connectionInfo + * ConnectionInfo object + * @return this builder + */ + ConnectorFactory usingConnectionInfo(ConnectionInfo connectionInfo); + } + + /** + * Build methods for authentication + * + * @since 2.0.0 + */ + public interface AuthenticationArgs { + + /** + * Build using password-based credentials + * + * @param username + * User name + * @param password + * Password + * @return this builder + */ + ConnectionOptions usingPassword(String username, CharSequence password); + + /** + * Build using Kerberos credentials + * + * @param principal + * Principal + * @param keyTabFile + * Path to keytab file + * @return this builder + */ + ConnectionOptions usingKerberos(String principal, String keyTabFile); + + /** + * Build using credentials from a CredentialProvider + * + * @param username + * Accumulo user name + * @param name + * Alias to extract Accumulo user password from CredentialProvider + * @param providerUrls + * Comma seperated list of URLs defining CredentialProvider(s) + * @return this builder + */ + ConnectionOptions usingProvider(String username, String name, String providerUrls); + + /** + * Build using specified credentials + * + * @param principal + * Principal/username + * @param token + * Authentication token + * @return this builder + */ + ConnectionOptions usingToken(String principal, AuthenticationToken token); + } + + /** + * Build methods for SSL/TLS + * + * @since 2.0.0 + */ + public interface SslOptions extends ConnectorFactory { + + /** + * Build with SSL trust store + * + * @param path + * Path to trust store + * @return this builder + */ + SslOptions withTruststore(String path); + + /** + * Build with SSL trust store + * + * @param path + * Path to trust store + * @param password + * Password used to encrypt trust store + * @param type + * Trust store type + * @return this builder + */ + SslOptions withTruststore(String path, String password, String type); + + /** + * Build with SSL key store + * + * @param path + * Path to SSL key store + * @return this builder + */ + SslOptions withKeystore(String path); + + /** + * Build with SSL key store + * + * @param path + * Path to keystore + * @param password + * Password used to encyrpt key store + * @param type + * Key store type + * @return this builder + */ + SslOptions withKeystore(String path, String password, String type); + + /** + * Use JSSE system properties to configure SSL + * + * @return this builder + */ + SslOptions useJsse(); + } + + /** + * Build methods for SASL + * + * @since 2.0.0 + */ + public interface SaslOptions extends ConnectorFactory { + + /** + * Build with Kerberos Server Primary + * + * @param kerberosServerPrimary + * Kerberos server primary + * @return this builder + */ + SaslOptions withPrimary(String kerberosServerPrimary); + + /** + * Build with SASL quality of protection + * + * @param qualityOfProtection + * Quality of protection + * @return this builder + */ + SaslOptions withQop(String qualityOfProtection); + } + + /** + * Build methods for connection options + * + * @since 2.0.0 + */ + public interface ConnectionOptions extends ConnectorFactory { + + /** + * Build using Zookeeper timeout + * + * @param timeout + * Zookeeper timeout + * @return this builder + */ + ConnectionOptions withZkTimeout(int timeout); + + /** + * Build with SSL/TLS options + * + * @return this builder + */ + SslOptions withSsl(); + + /** + * Build with SASL options + * + * @return this builder + */ + SaslOptions withSasl(); + + /** + * Build with BatchWriterConfig defaults for BatchWriter, MultiTableBatchWriter & BatchDeleter + * + * @param batchWriterConfig + * BatchWriterConfig + * @return this builder + */ + ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig); + } + + /** + * Creates builder for Connector + * + * @return this builder + * @since 2.0.0 + */ + public static PropertyOptions builder() { + return new ConnectorImpl.ConnectorBuilderImpl(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java index 831779c424..d6fd7d67c9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java @@ -26,8 +26,8 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; @@ -54,15 +54,18 @@ protected final Instance inst; private Credentials creds; private ClientConfiguration clientConf; + private BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); private final AccumuloConfiguration rpcConf; protected Connector conn; - /** - * Instantiate a client context - */ public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf) { + this(instance, credentials, clientConf, new BatchWriterConfig()); + } + + public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf, BatchWriterConfig batchWriterConfig) { this(instance, credentials, convertClientConfig(requireNonNull(clientConf, "clientConf is null"))); this.clientConf = clientConf; + this.batchWriterConfig = batchWriterConfig; } /** @@ -153,6 +156,10 @@ public Connector getConnector() throws AccumuloException, AccumuloSecurityExcept return conn; } + public BatchWriterConfig getBatchWriterConfig() { + return batchWriterConfig; + } + /** * Serialize the credentials just before initiating the RPC call */ @@ -199,9 +206,9 @@ public String get(Property property) { else { // Reconstitute the server kerberos property from the client config if (Property.GENERAL_KERBEROS_PRINCIPAL == property) { - if (config.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) { + if (config.containsKey(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) { // Avoid providing a realm since we don't know what it is... - return config.getString(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()) + "/_HOST@" + SaslConnectionParams.getDefaultRealm(); + return config.getString(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()) + "/_HOST@" + SaslConnectionParams.getDefaultRealm(); } } return defaults.get(property); @@ -221,8 +228,8 @@ public void getProperties(Map<String,String> props, Predicate<String> filter) { // Two client props that don't exist on the server config. Client doesn't need to know about the Kerberos instance from the principle, but servers do // Automatically reconstruct the server property when converting a client config. - if (props.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) { - final String serverPrimary = props.remove(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()); + if (props.containsKey(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) { + final String serverPrimary = props.remove(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()); if (filter.test(Property.GENERAL_KERBEROS_PRINCIPAL.getKey())) { // Use the _HOST expansion. It should be unnecessary in "client land". props.put(Property.GENERAL_KERBEROS_PRINCIPAL.getKey(), serverPrimary + "/_HOST@" + SaslConnectionParams.getDefaultRealm()); diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java new file mode 100644 index 0000000000..1aafc06ed8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Durability; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.conf.ClientProperty; + +/** + * Creates internal objects using {@link ConnectionInfo} + */ +public class ConnectionInfoFactory { + + public static String getString(ConnectionInfo info, ClientProperty property) { + return property.getValue(info.getProperties()); + } + + public static Long getLong(ConnectionInfo info, ClientProperty property) { + return property.getLong(info.getProperties()); + } + + public static Connector getConnector(ConnectionInfo info) throws AccumuloSecurityException, AccumuloException { + return new ConnectorImpl(getClientContext(info)); + } + + public static ClientContext getClientContext(ConnectionInfo info) { + return new ClientContext(getInstance(info), getCredentials(info), getClientConfiguration(info), getBatchWriterConfig(info)); + } + + public static Instance getInstance(ConnectionInfo info) { + return new ZooKeeperInstance(getClientConfiguration(info)); + } + + public static Credentials getCredentials(ConnectionInfo info) { + return new Credentials(info.getPrincipal(), info.getAuthenticationToken()); + } + + public static BatchWriterConfig getBatchWriterConfig(ConnectionInfo info) { + BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); + Long maxMemory = getLong(info, ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES); + if (maxMemory != null) { + batchWriterConfig.setMaxMemory(maxMemory); + } + Long maxLatency = getLong(info, ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC); + if (maxLatency != null) { + batchWriterConfig.setMaxLatency(maxLatency, TimeUnit.SECONDS); + } + Long timeout = getLong(info, ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC); + if (timeout != null) { + batchWriterConfig.setTimeout(timeout, TimeUnit.SECONDS); + } + String durability = getString(info, ClientProperty.BATCH_WRITER_DURABILITY); + if (!durability.isEmpty()) { + batchWriterConfig.setDurability(Durability.valueOf(durability.toUpperCase())); + } + return batchWriterConfig; + } + + public static ClientConfiguration getClientConfiguration(ConnectionInfo info) { + ClientConfiguration config = ClientConfiguration.create(); + for (Object keyObj : info.getProperties().keySet()) { + String key = (String) keyObj; + String val = info.getProperties().getProperty(key); + if (key.equals(ClientProperty.INSTANCE_ZOOKEEPERS.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_ZK_HOST, val); + } else if (key.equals(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT_SEC.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_ZK_TIMEOUT, val); + } else if (key.equals(ClientProperty.SSL_ENABLED.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_ENABLED, val); + } else if (key.equals(ClientProperty.SSL_KEYSTORE_PATH.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_PATH, val); + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH, "true"); + } else if (key.equals(ClientProperty.SSL_KEYSTORE_TYPE.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_TYPE, val); + } else if (key.equals(ClientProperty.SSL_KEYSTORE_PASSWORD.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_PASSWORD, val); + } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_PATH.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val); + } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_TYPE.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val); + } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_PASSWORD.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val); + } else if (key.equals(ClientProperty.SSL_USE_JSSE.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_USE_JSSE, val); + } else if (key.equals(ClientProperty.SASL_ENABLED.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_ENABLED, val); + } else if (key.equals(ClientProperty.SASL_QOP.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SASL_QOP, val); + } else if (key.equals(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY, val); + } else { + config.setProperty(key, val); + } + } + return config; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java new file mode 100644 index 0000000000..916625c930 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.util.Properties; + +import org.apache.accumulo.core.client.ConnectionInfo; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.conf.ClientProperty; + +public class ConnectionInfoImpl implements ConnectionInfo { + + private Properties properties; + private AuthenticationToken token; + + ConnectionInfoImpl(Properties properties, AuthenticationToken token) { + this.properties = properties; + this.token = token; + } + + @Override + public String getInstanceName() { + return getString(ClientProperty.INSTANCE_NAME); + } + + @Override + public String getZookeepers() { + return getString(ClientProperty.INSTANCE_ZOOKEEPERS); + } + + @Override + public String getPrincipal() { + return getString(ClientProperty.AUTH_USERNAME); + } + + @Override + public Properties getProperties() { + return properties; + } + + @Override + public AuthenticationToken getAuthenticationToken() { + return token; + } + + private String getString(ClientProperty property) { + return property.getValue(properties); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java index f49e4dc6d6..03c719c272 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java @@ -18,6 +18,12 @@ import static com.google.common.base.Preconditions.checkArgument; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; @@ -28,6 +34,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -41,6 +48,11 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.CredentialProviderToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.trace.Tracer; @@ -54,7 +66,7 @@ private InstanceOperations instanceops = null; private ReplicationOperations replicationops = null; - public ConnectorImpl(final ClientContext context) throws AccumuloException, AccumuloSecurityException { + public ConnectorImpl(final ClientContext context) throws AccumuloSecurityException, AccumuloException { checkArgument(context != null, "Context is null"); checkArgument(context.getCredentials() != null, "Credentials are null"); checkArgument(context.getCredentials().getToken() != null, "Authentication token is null"); @@ -113,7 +125,12 @@ public BatchDeleter createBatchDeleter(String tableName, Authorizations authoriz throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, numQueryThreads, config); + return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, numQueryThreads, config.merge(context.getBatchWriterConfig())); + } + + @Override + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { + return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig()); } @Deprecated @@ -127,7 +144,16 @@ public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxL @Override public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); - return new BatchWriterImpl(context, getTableId(tableName), config); + // we used to allow null inputs for bw config + if (config == null) { + config = new BatchWriterConfig(); + } + return new BatchWriterImpl(context, getTableId(tableName), config.merge(context.getBatchWriterConfig())); + } + + @Override + public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException { + return createBatchWriter(tableName, new BatchWriterConfig()); } @Deprecated @@ -139,7 +165,12 @@ public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long ma @Override public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) { - return new MultiTableBatchWriterImpl(context, config); + return new MultiTableBatchWriterImpl(context, config.merge(context.getBatchWriterConfig())); + } + + @Override + public MultiTableBatchWriter createMultiTableBatchWriter() { + return createMultiTableBatchWriter(new BatchWriterConfig()); } @Override @@ -193,4 +224,222 @@ public synchronized ReplicationOperations replicationOperations() { return replicationops; } + + public static class ConnectorBuilderImpl implements InstanceArgs, PropertyOptions, ConnectionInfoOptions, AuthenticationArgs, ConnectionOptions, SslOptions, + SaslOptions, ConnectorFactory { + + private Properties properties = new Properties(); + private AuthenticationToken token = null; + + @Override + public Connector build() throws AccumuloException, AccumuloSecurityException { + return ConnectionInfoFactory.getConnector(new ConnectionInfoImpl(properties, token)); + } + + @Override + public ConnectionInfo info() { + return new ConnectionInfoImpl(properties, token); + } + + @Override + public AuthenticationArgs forInstance(String instanceName, String zookeepers) { + setProperty(ClientProperty.INSTANCE_NAME, instanceName); + setProperty(ClientProperty.INSTANCE_ZOOKEEPERS, zookeepers); + return this; + } + + @Override + public SslOptions withTruststore(String path) { + setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path); + return this; + } + + @Override + public SslOptions withTruststore(String path, String password, String type) { + setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path); + setProperty(ClientProperty.SSL_TRUSTSTORE_PASSWORD, password); + setProperty(ClientProperty.SSL_TRUSTSTORE_TYPE, type); + return this; + } + + @Override + public SslOptions withKeystore(String path) { + setProperty(ClientProperty.SSL_KEYSTORE_PATH, path); + return this; + } + + @Override + public SslOptions withKeystore(String path, String password, String type) { + setProperty(ClientProperty.SSL_KEYSTORE_PATH, path); + setProperty(ClientProperty.SSL_KEYSTORE_PASSWORD, password); + setProperty(ClientProperty.SSL_KEYSTORE_TYPE, type); + return this; + } + + @Override + public SslOptions useJsse() { + setProperty(ClientProperty.SSL_USE_JSSE, "true"); + return this; + } + + @Override + public ConnectionOptions withZkTimeout(int timeout) { + setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT_SEC, Integer.toString(timeout)); + return this; + } + + @Override + public SslOptions withSsl() { + setProperty(ClientProperty.SSL_ENABLED, "true"); + return this; + } + + @Override + public SaslOptions withSasl() { + setProperty(ClientProperty.SASL_ENABLED, "true"); + return this; + } + + @Override + public ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig) { + setProperty(ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES, batchWriterConfig.getMaxMemory()); + setProperty(ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC, batchWriterConfig.getMaxLatency(TimeUnit.SECONDS)); + setProperty(ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC, batchWriterConfig.getTimeout(TimeUnit.SECONDS)); + setProperty(ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS, batchWriterConfig.getMaxWriteThreads()); + setProperty(ClientProperty.BATCH_WRITER_DURABILITY, batchWriterConfig.getDurability().toString()); + return this; + } + + @Override + public SaslOptions withPrimary(String kerberosServerPrimary) { + setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary); + return this; + } + + @Override + public SaslOptions withQop(String qualityOfProtection) { + setProperty(ClientProperty.SASL_QOP, qualityOfProtection); + return this; + } + + @Override + public ConnectorFactory usingProperties(String configFile) { + Properties properties = new Properties(); + try (InputStream is = new FileInputStream(configFile)) { + properties.load(is); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + return usingProperties(properties); + } + + @Override + public ConnectorFactory usingProperties(Properties properties) { + this.properties = properties; + String authMethod = ClientProperty.AUTH_METHOD.getValue(properties).toLowerCase(); + switch (authMethod) { + case "password": + String password = ClientProperty.AUTH_PASSWORD.getValue(properties); + Objects.nonNull(password); + this.token = new PasswordToken(password); + this.properties.remove(ClientProperty.AUTH_PASSWORD); + break; + case "kerberos": + String principal = ClientProperty.AUTH_USERNAME.getValue(properties); + String keytabPath = ClientProperty.AUTH_KERBEROS_KEYTAB_PATH.getValue(properties); + Objects.nonNull(principal); + Objects.nonNull(keytabPath); + try { + this.token = new KerberosToken(principal, new File(keytabPath)); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + case "provider": + String name = ClientProperty.AUTH_PROVIDER_NAME.getValue(properties); + String providerUrls = ClientProperty.AUTH_PROVIDER_URLS.getValue(properties); + try { + this.token = new CredentialProviderToken(name, providerUrls); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + default: + throw new IllegalArgumentException("An authentication method (password, kerberos, etc) must be set"); + } + return this; + } + + @Override + public ConnectionOptions usingPassword(String username, CharSequence password) { + setProperty(ClientProperty.AUTH_METHOD, "password"); + setProperty(ClientProperty.AUTH_USERNAME, username); + this.token = new PasswordToken(password); + return this; + } + + @Override + public ConnectionOptions usingKerberos(String principal, String keyTabFile) { + setProperty(ClientProperty.AUTH_METHOD, "kerberos"); + setProperty(ClientProperty.AUTH_USERNAME, principal); + setProperty(ClientProperty.AUTH_KERBEROS_KEYTAB_PATH, keyTabFile); + try { + this.token = new KerberosToken(principal, new File(keyTabFile)); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + return this; + } + + @Override + public ConnectionOptions usingProvider(String username, String name, String providerUrls) { + setProperty(ClientProperty.AUTH_METHOD, "provider"); + setProperty(ClientProperty.AUTH_USERNAME, username); + setProperty(ClientProperty.AUTH_PROVIDER_NAME, name); + setProperty(ClientProperty.AUTH_PROVIDER_URLS, providerUrls); + try { + this.token = new CredentialProviderToken(name, providerUrls); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + return this; + } + + @Override + public ConnectionOptions usingToken(String principal, AuthenticationToken token) { + this.token = token; + setProperty(ClientProperty.AUTH_USERNAME, principal); + if (token instanceof CredentialProviderToken) { + setProperty(ClientProperty.AUTH_METHOD, "provider"); + CredentialProviderToken cpt = (CredentialProviderToken) token; + setProperty(ClientProperty.AUTH_PROVIDER_NAME, cpt.getName()); + setProperty(ClientProperty.AUTH_PROVIDER_URLS, cpt.getCredentialProviders()); + } else if (token instanceof PasswordToken) { + setProperty(ClientProperty.AUTH_METHOD, "password"); + } else if (token instanceof KerberosToken) { + setProperty(ClientProperty.AUTH_METHOD, "kerberos"); + setProperty(ClientProperty.AUTH_KERBEROS_KEYTAB_PATH, ((KerberosToken) token).getKeytab().getAbsolutePath()); + } else { + setProperty(ClientProperty.AUTH_METHOD, "unknown"); + } + return this; + } + + @Override + public ConnectorFactory usingConnectionInfo(ConnectionInfo connectionInfo) { + this.properties = connectionInfo.getProperties(); + this.token = connectionInfo.getAuthenticationToken(); + return this; + } + + public void setProperty(ClientProperty property, String value) { + properties.setProperty(property.getKey(), value); + } + + public void setProperty(ClientProperty property, Long value) { + setProperty(property, Long.toString(value)); + } + + public void setProperty(ClientProperty property, Integer value) { + setProperty(property, Integer.toString(value)); + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java index b2f0ed7fd7..a1faa0008e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; @@ -48,6 +49,7 @@ import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.Credentials; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.impl.OfflineScanner; @@ -115,6 +117,20 @@ public static String getClassLoaderContext(JobConf job) { return InputConfigurator.getClassLoaderContext(CLASS, job); } + /** + * Sets connection information needed to communicate with Accumulo for this job + * + * @param job + * Hadoop job instance to be configured + * @param info + * Connection information for Accumulo + * @since 2.0.0 + */ + public static void setConnectionInfo(JobConf job, ConnectionInfo info) throws AccumuloSecurityException { + setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); + setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + } + /** * Sets the connector information needed to communicate with Accumulo in this job. * @@ -131,7 +147,9 @@ public static String getClassLoaderContext(JobConf job) { * @param token * the user's password * @since 1.5.0 + * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead */ + @Deprecated public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException { if (token instanceof KerberosToken) { log.info("Received KerberosToken, attempting to fetch DelegationToken"); @@ -172,7 +190,9 @@ public static void setConnectorInfo(JobConf job, String principal, Authenticatio * @param tokenFile * the path to the token file * @since 1.6.0 + * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead */ + @Deprecated public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException { InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile); } @@ -228,7 +248,7 @@ protected static AuthenticationToken getAuthenticationToken(JobConf job) { * @param zooKeepers * a comma-separated list of zookeeper servers * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead. + * @deprecated since 1.6.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ @Deprecated public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) { @@ -243,7 +263,9 @@ public static void setZooKeeperInstance(JobConf job, String instanceName, String * @param clientConfig * client configuration containing connection options * @since 1.6.0 + * @deprecated since 2.0.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ + @Deprecated public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) { InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java index 5f00ec3a73..affb535f6a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java @@ -19,8 +19,7 @@ import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -39,10 +38,9 @@ * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} - * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)} + * <li>{@link AccumuloInputFormat#setConnectionInfo(JobConf, ConnectionInfo)} + * <li>{@link AccumuloInputFormat#setInputTableName(JobConf, String)}</li> * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)} - * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} * </ul> * * Other static methods are optional. diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java index 3a2e3fa9b0..3c0b4b693a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.mapreduce.InputTableConfig; import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; import org.apache.accumulo.core.data.Key; @@ -37,7 +38,7 @@ * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, org.apache.accumulo.core.client.security.tokens.AuthenticationToken)} + * <li>{@link AccumuloInputFormat#setConnectionInfo(JobConf, ConnectionInfo)} * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)} * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, org.apache.accumulo.core.security.Authorizations)} * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java index 6f07ce7e64..426a4d6329 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -37,6 +38,7 @@ import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; +import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; @@ -68,9 +70,7 @@ * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} - * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, String)} - * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} + * <li>{@link AccumuloOutputFormat#setConnectionInfo(JobConf, ConnectionInfo)} * </ul> * * Other static methods are optional. @@ -80,6 +80,20 @@ private static final Class<?> CLASS = AccumuloOutputFormat.class; protected static final Logger log = Logger.getLogger(CLASS); + /** + * Set the connection information needed to communicate with Accumulo in this job. + * + * @param job + * Hadoop job to be configured + * @param info + * Accumulo connection information + * @since 2.0.0 + */ + public static void setConnectionInfo(JobConf job, ConnectionInfo info) throws AccumuloSecurityException { + setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); + setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + } + /** * Sets the connector information needed to communicate with Accumulo in this job. * @@ -96,7 +110,9 @@ * @param token * the user's password * @since 1.5.0 + * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ + @Deprecated public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException { if (token instanceof KerberosToken) { log.info("Received KerberosToken, attempting to fetch DelegationToken"); @@ -137,7 +153,9 @@ public static void setConnectorInfo(JobConf job, String principal, Authenticatio * @param tokenFile * the path to the password file * @since 1.6.0 + * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead */ + @Deprecated public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException { OutputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile); } @@ -215,9 +233,8 @@ protected static AuthenticationToken getAuthenticationToken(JobConf job) { * @param zooKeepers * a comma-separated list of zookeeper servers * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead. + * @deprecated since 1.6.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ - @Deprecated public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) { setZooKeeperInstance(job, ClientConfiguration.create().withInstance(instanceName).withZkHosts(zooKeepers)); @@ -232,7 +249,9 @@ public static void setZooKeeperInstance(JobConf job, String instanceName, String * @param clientConfig * client configuration for specifying connection timeouts, SSL connection options, etc. * @since 1.6.0 + * @deprecated since 2.0.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ + @Deprecated public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) { OutputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java index 5049ef71cf..62b949a230 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java @@ -19,9 +19,8 @@ import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -40,10 +39,9 @@ * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} + * <li>{@link AccumuloRowInputFormat#setConnectionInfo(JobConf, ConnectionInfo)} * <li>{@link AccumuloRowInputFormat#setInputTableName(JobConf, String)} * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, Authorizations)} - * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} * </ul> * * Other static methods are optional. diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index 72a9c924d5..965a4553c8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; @@ -48,6 +49,7 @@ import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.Credentials; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.impl.OfflineScanner; @@ -118,6 +120,20 @@ public static String getClassLoaderContext(JobContext job) { return InputConfigurator.getClassLoaderContext(CLASS, job.getConfiguration()); } + /** + * Sets connection information needed to communicate with Accumulo for this job + * + * @param job + * Hadoop job instance to be configured + * @param info + * Connection information for Accumulo + * @since 2.0.0 + */ + public static void setConnectionInfo(Job job, ConnectionInfo info) throws AccumuloSecurityException { + setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); + setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + } + /** * Sets the connector information needed to communicate with Accumulo in this job. * @@ -134,7 +150,9 @@ public static String getClassLoaderContext(JobContext job) { * @param token * the user's password * @since 1.5.0 + * @deprecated since 2.0.0; use {@link #setConnectionInfo(Job, ConnectionInfo)} instead. */ + @Deprecated public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException { if (token instanceof KerberosToken) { log.info("Received KerberosToken, attempting to fetch DelegationToken"); @@ -253,7 +271,7 @@ protected static AuthenticationToken getAuthenticationToken(JobContext context) * @param zooKeepers * a comma-separated list of zookeeper servers * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(Job, ClientConfiguration)} instead. + * @deprecated since 1.6.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead. */ @Deprecated public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) { @@ -269,7 +287,9 @@ public static void setZooKeeperInstance(Job job, String instanceName, String zoo * @param clientConfig * client configuration containing connection options * @since 1.6.0 + * @deprecated since 2.0.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead. */ + @Deprecated public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) { InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java index 837b3fe8d7..441ac33472 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java @@ -19,8 +19,7 @@ import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -39,9 +38,9 @@ * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)} + * <li>{@link AccumuloInputFormat#setConnectionInfo(Job, ConnectionInfo)} + * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)} * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)} - * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} * </ul> * * Other static methods are optional. diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index e821b5d1c0..af69e5a2db 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -37,6 +38,7 @@ import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; +import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; @@ -69,9 +71,7 @@ * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, AuthenticationToken)} - * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, String)} - * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, ClientConfiguration)} + * <li>{@link AccumuloOutputFormat#setConnectionInfo(Job, ConnectionInfo)} * </ul> * * Other static methods are optional. @@ -81,6 +81,20 @@ private static final Class<?> CLASS = AccumuloOutputFormat.class; protected static final Logger log = Logger.getLogger(CLASS); + /** + * Set the connection information needed to communicate with Accumulo in this job. + * + * @param job + * Hadoop job to be configured + * @param info + * Accumulo connection information + * @since 2.0.0 + */ + public static void setConnectionInfo(Job job, ConnectionInfo info) throws AccumuloSecurityException { + setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); + setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + } + /** * Sets the connector information needed to communicate with Accumulo in this job. * @@ -97,7 +111,9 @@ * @param token * the user's password * @since 1.5.0 + * @deprecated since 2.0.0, replaced by {@link #setConnectionInfo(Job, ConnectionInfo)} */ + @Deprecated public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException { if (token instanceof KerberosToken) { log.info("Received KerberosToken, attempting to fetch DelegationToken"); @@ -138,7 +154,9 @@ public static void setConnectorInfo(Job job, String principal, AuthenticationTok * @param tokenFile * the path to the token file * @since 1.6.0 + * @deprecated since 2.0.0, replaced by {@link #setConnectionInfo(Job, ConnectionInfo)} */ + @Deprecated public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException { OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile); } @@ -232,7 +250,9 @@ public static void setZooKeeperInstance(Job job, String instanceName, String zoo * @param clientConfig * client configuration for specifying connection timeouts, SSL connection options, etc. * @since 1.6.0 + * @deprecated since 2.0.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead. */ + @Deprecated public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) { OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java index 043f88af8d..4f6b2bd9a9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java @@ -19,9 +19,8 @@ import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -40,10 +39,9 @@ * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, AuthenticationToken)} + * <li>{@link AccumuloRowInputFormat#setConnectionInfo(Job, ConnectionInfo)} * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)} * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)} - * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} * </ul> * * Other static methods are optional. diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java index 9b5601bf81..491812509c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java @@ -85,6 +85,11 @@ public BatchDeleter createBatchDeleter(String tableName, Authorizations authoriz config.getMaxWriteThreads()); } + @Override + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { + return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig()); + } + @Deprecated @Override public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { @@ -98,6 +103,11 @@ public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) return createBatchWriter(tableName, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } + @Override + public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException { + return createBatchWriter(tableName, new BatchWriterConfig()); + } + @Deprecated @Override public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) { @@ -109,6 +119,11 @@ public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig confi return createMultiTableBatchWriter(config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } + @Override + public MultiTableBatchWriter createMultiTableBatchWriter() { + return createMultiTableBatchWriter(new BatchWriterConfig()); + } + @Override public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { MockTable table = acu.tables.get(tableName); diff --git a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java index 5ac6f02ffe..b39afe2081 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java +++ b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java @@ -33,6 +33,9 @@ public class CredentialProviderToken extends PasswordToken { public static final String NAME_PROPERTY = "name", CREDENTIAL_PROVIDERS_PROPERTY = "credentialProviders"; + private String name; + private String credentialProviders; + public CredentialProviderToken() { super(); } @@ -40,11 +43,12 @@ public CredentialProviderToken() { public CredentialProviderToken(String name, String credentialProviders) throws IOException { requireNonNull(name); requireNonNull(credentialProviders); - setWithCredentialProviders(name, credentialProviders); } protected void setWithCredentialProviders(String name, String credentialProviders) throws IOException { + this.name = name; + this.credentialProviders = credentialProviders; final Configuration conf = new Configuration(CachedConfiguration.getInstance()); conf.set(CredentialProviderFactoryShim.CREDENTIAL_PROVIDER_PATH, credentialProviders); @@ -57,6 +61,20 @@ protected void setWithCredentialProviders(String name, String credentialProvider setPassword(CharBuffer.wrap(password)); } + /** + * @return Name used to extract Accumulo user password from CredentialProvider + */ + public String getName() { + return name; + } + + /** + * @return CSV list of CredentialProvider(s) + */ + public String getCredentialProviders() { + return credentialProviders; + } + @Override public void init(Properties properties) { char[] nameCharArray = properties.get(NAME_PROPERTY), credentialProvidersCharArray = properties.get(CREDENTIAL_PROVIDERS_PROPERTY); diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java new file mode 100644 index 0000000000..82188231a5 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.conf; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.FileNotFoundException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; + +import com.google.common.collect.Sets; + +/** + * Generates client-properties.md for documentation on Accumulo website and accumulo-client.properties for Accumulo distribution tarball + */ +class ClientConfigGenerate { + + private abstract class Format { + + abstract void beginSection(String section); + + abstract void pageHeader(); + + abstract void property(ClientProperty prop); + + void generate() { + pageHeader(); + + generateSection("Instance", "instance."); + generateSection("Authentication", "auth.", "auth.method", "auth.username"); + generateSection("Batch Writer", "batch.writer."); + generateSection("SSL", "ssl."); + generateSection("SASL", "sasl."); + generateSection("Tracing", "trace."); + + doc.close(); + } + + void generateSection(String section, String prefix, String... prefixProps) { + beginSection(section); + for (String prop : prefixProps) { + ClientProperty cp = sortedProps.get(prop); + if (cp != null) { + property(cp); + } + } + Set<String> prefixSet = Sets.newHashSet(prefixProps); + for (ClientProperty prop : sortedProps.values()) { + if (prop.getKey().startsWith(prefix) && !prefixSet.contains(prop.getKey())) { + property(prop); + } + } + } + + void generateSection(String section, String prefix) { + generateSection(section, prefix, ""); + } + } + + private class Markdown extends Format { + + @Override + void beginSection(String section) {} + + @Override + void pageHeader() { + doc.println("---"); + doc.println("title: Client Properties"); + doc.println("category: development"); + doc.println("order: 9"); + doc.println("---\n"); + doc.println("<!-- WARNING: Do not edit this file. It is a generated file that is copied from Accumulo build (from core/target/generated-docs) -->"); + doc.println("<!-- Generated by : " + getClass().getName() + " -->\n"); + doc.println("Below are properties set in `accumulo-client.properties` that configure Accumulo clients. All properties have been part of the API since 2.0.0 (unless otherwise specified):\n"); + doc.println("| Property | Default value | Since | Description |"); + doc.println("|----------|---------------|-------|-------------|"); + } + + @Override + void property(ClientProperty prop) { + Objects.nonNull(prop); + doc.print("| <a name=\"" + prop.getKey().replace(".", "_") + "\" class=\"prop\"></a> " + prop.getKey() + " | "); + String defaultValue = sanitize(prop.getDefaultValue()).trim(); + if (defaultValue.length() == 0) { + defaultValue = "*empty*"; + } + doc.println(defaultValue + " | " + prop.getSince() + " | " + sanitize(prop.getDescription() + " |")); + } + + String sanitize(String str) { + return str.replace("\n", "<br>"); + } + } + + private class ConfigFile extends Format { + + @Override + void beginSection(String section) { + doc.println("\n## " + section + " properties"); + doc.println("## --------------"); + } + + @Override + void pageHeader() { + doc.println("# Licensed to the Apache Software Foundation (ASF) under one or more"); + doc.println("# contributor license agreements. See the NOTICE file distributed with"); + doc.println("# this work for additional information regarding copyright ownership."); + doc.println("# The ASF licenses this file to You under the Apache License, Version 2.0"); + doc.println("# (the \"License\"); you may not use this file except in compliance with"); + doc.println("# the License. You may obtain a copy of the License at"); + doc.println("#"); + doc.println("# http://www.apache.org/licenses/LICENSE-2.0"); + doc.println("#"); + doc.println("# Unless required by applicable law or agreed to in writing, software"); + doc.println("# distributed under the License is distributed on an \"AS IS\" BASIS,"); + doc.println("# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied."); + doc.println("# See the License for the specific language governing permissions and"); + doc.println("# limitations under the License.\n"); + doc.println("################################"); + doc.println("## Accumulo client configuration"); + doc.println("################################\n"); + doc.println("## NOTE - All properties that have a default are set with it. Properties that"); + doc.println("## are uncommented must be set by the user."); + } + + @Override + void property(ClientProperty prop) { + doc.println("## " + prop.getDescription()); + if (!prop.isRequired()) { + doc.print("#"); + } + doc.println(prop.getKey() + "=" + prop.getDefaultValue() + "\n"); + } + } + + private PrintStream doc; + private final TreeMap<String,ClientProperty> sortedProps = new TreeMap<>(); + + private ClientConfigGenerate(PrintStream doc) { + Objects.nonNull(doc); + this.doc = doc; + for (ClientProperty prop : ClientProperty.values()) { + this.sortedProps.put(prop.getKey(), prop); + } + } + + private void generateMarkdown() { + new Markdown().generate(); + } + + private void generateConfigFile() { + new ConfigFile().generate(); + } + + /** + * Generates markdown and config files for Accumulo client properties. Arguments are: "--generate-markdown filename" or "--generate-config filename" + * + * @param args + * command-line arguments + * @throws IllegalArgumentException + * if args is invalid + */ + public static void main(String[] args) throws FileNotFoundException, UnsupportedEncodingException { + if (args.length == 2) { + ClientConfigGenerate clientConfigGenerate = new ClientConfigGenerate(new PrintStream(args[1], UTF_8.name())); + if (args[0].equals("--generate-markdown")) { + clientConfigGenerate.generateMarkdown(); + return; + } else if (args[0].equals("--generate-config")) { + clientConfigGenerate.generateConfigFile(); + return; + } + } + throw new IllegalArgumentException("Usage: " + ClientConfigGenerate.class.getName() + " [--generate-markdown|--generate-config] <filename>"); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java new file mode 100644 index 0000000000..b645b10d48 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.conf; + +import java.util.Objects; +import java.util.Properties; + +import org.apache.accumulo.core.Constants; + +public enum ClientProperty { + + // Instance + INSTANCE_NAME("instance.name", "", "Name of Accumulo instance to connect to", "", true), + INSTANCE_ZOOKEEPERS("instance.zookeepers", "localhost:2181", "Zookeeper connection information for Accumulo instance", "", true), + INSTANCE_ZOOKEEPERS_TIMEOUT_SEC("instance.zookeepers.timeout.sec", "30", "Zookeeper session timeout (in seconds)"), + + // Authentication + AUTH_METHOD("auth.method", "password", "Authentication method (i.e password, kerberos, provider). Set more properties for chosen method below.", "", true), + AUTH_USERNAME("auth.username", "", "Accumulo username/principal for chosen authentication method", "", true), + AUTH_PASSWORD("auth.password", "", "Accumulo user password", "", true), + AUTH_KERBEROS_KEYTAB_PATH("auth.kerberos.keytab.path", "", "Path to Kerberos keytab"), + AUTH_PROVIDER_NAME("auth.provider.name", "", "Alias used to extract Accumulo user password from CredentialProvider"), + AUTH_PROVIDER_URLS("auth.provider.urls", "", "Comma separated list of URLs defining CredentialProvider(s)"), + + // BatchWriter + BATCH_WRITER_MAX_MEMORY_BYTES("batch.writer.max.memory.bytes", "52428800", "Max memory (in bytes) to batch before writing"), + BATCH_WRITER_MAX_LATENCY_SEC("batch.writer.max.latency.sec", "120", "Max amount of time (in seconds) to hold data in memory before flushing it"), + BATCH_WRITER_MAX_TIMEOUT_SEC("batch.writer.max.timeout.sec", "0", + "Max amount of time (in seconds) an unresponsive server will be re-tried. An exception is thrown when this timeout is exceeded. Set to zero for no timeout."), + BATCH_WRITER_MAX_WRITE_THREADS("batch.writer.max.write.threads", "3", "Maximum number of threads to use for writing data to tablet servers."), + BATCH_WRITER_DURABILITY("batch.writer.durability", "default", + "Change the durability for the BatchWriter session. To use the table's durability setting. use \"default\" which is the table's durability setting."), + + // SSL + SSL_ENABLED("ssl.enabled", "false", "Enable SSL for client RPC"), + SSL_KEYSTORE_PASSWORD("ssl.keystore.password", "", "Password used to encrypt keystore"), + SSL_KEYSTORE_PATH("ssl.keystore.path", "", "Path to SSL keystore file"), + SSL_KEYSTORE_TYPE("ssl.keystore.type", "jks", "Type of SSL keystore"), + SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password", "", "Password used to encrypt truststore"), + SSL_TRUSTSTORE_PATH("ssl.truststore.path", "", "Path to SSL truststore file"), + SSL_TRUSTSTORE_TYPE("ssl.truststore.type", "jks", "Type of SSL truststore"), + SSL_USE_JSSE("ssl.use.jsse", "false", "Use JSSE system properties to configure SSL"), + + // SASL + SASL_ENABLED("sasl.enabled", "false", "Enable SASL for client RPC"), + SASL_QOP("sasl.qop", "auth", "SASL quality of protection. Valid values are 'auth', 'auth-int', and 'auth-conf'"), + SASL_KERBEROS_SERVER_PRIMARY("sasl.kerberos.server.primary", "accumulo", "Kerberos principal/primary that Accumulo servers use to login"), + + // Trace + TRACE_SPAN_RECEIVERS("trace.span.receivers", "org.apache.accumulo.tracer.ZooTraceClient", "A list of span receiver classes to send trace spans"), + TRACE_ZOOKEEPER_PATH("trace.zookeeper.path", Constants.ZTRACERS, "The zookeeper node where tracers are registered"); + + private String key; + private String defaultValue; + private String description; + private String since; + private boolean required; + + ClientProperty(String key, String defaultValue, String description, String since, boolean required) { + Objects.requireNonNull(key); + Objects.requireNonNull(defaultValue); + Objects.requireNonNull(description); + Objects.requireNonNull(since); + this.key = key; + this.defaultValue = defaultValue; + this.description = description; + this.since = since; + this.required = required; + } + + ClientProperty(String key, String defaultValue, String description, String since) { + this(key, defaultValue, description, since, false); + } + + ClientProperty(String key, String defaultValue, String description) { + this(key, defaultValue, description, ""); + } + + public String getKey() { + return key; + } + + public String getDefaultValue() { + return defaultValue; + } + + public String getDescription() { + return description; + } + + public String getSince() { + return since; + } + + public boolean isRequired() { + return required; + } + + public String getValue(Properties properties) { + Objects.requireNonNull(properties); + String value = properties.getProperty(getKey()); + if (value == null || value.isEmpty()) { + value = getDefaultValue(); + } + Objects.requireNonNull(value); + if (isRequired() && value.isEmpty()) { + throw new IllegalArgumentException(getKey() + " must be set!"); + } + return value; + } + + public Long getLong(Properties properties) { + String value = getValue(properties); + if (value.isEmpty()) { + return null; + } + return Long.parseLong(value); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java index 61245f87d4..91df2dc18b 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java @@ -80,11 +80,12 @@ void beginTable(String name) { @Override void pageHeader() { doc.println("---"); - doc.println("title: Configuration Properties"); + doc.println("title: Server Properties"); doc.println("category: administration"); doc.println("order: 3"); doc.println("---\n"); doc.println("<!-- WARNING: Do not edit this file. It is a generated file that is copied from Accumulo build (from core/target/generated-docs) -->\n"); + doc.println("Below are properties set in `accumulo-site.xml` or the Accumulo shell that configure Accumulo servers (i.e tablet server, master, etc):\n"); } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java b/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java index 9a32c26687..786a7bd93f 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import org.junit.Assert; import org.junit.Test; /** @@ -201,6 +202,24 @@ public void testManualEquality() { assertEquals(cfg1.hashCode(), cfg2.hashCode()); } + @Test + public void testMerge() { + BatchWriterConfig cfg1 = new BatchWriterConfig(), cfg2 = new BatchWriterConfig(); + cfg1.setMaxMemory(1234); + cfg2.setMaxMemory(5858); + cfg2.setDurability(Durability.LOG); + cfg2.setMaxLatency(456, TimeUnit.MILLISECONDS); + + Assert.assertEquals(Durability.DEFAULT, cfg1.getDurability()); + + BatchWriterConfig merged = cfg1.merge(cfg2); + + Assert.assertEquals(1234, merged.getMaxMemory()); + Assert.assertEquals(Durability.LOG, merged.getDurability()); + Assert.assertEquals(456, merged.getMaxLatency(TimeUnit.MILLISECONDS)); + Assert.assertEquals(3, merged.getMaxWriteThreads()); + } + private byte[] createBytes(BatchWriterConfig bwConfig) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); bwConfig.write(new DataOutputStream(baos)); diff --git a/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java b/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java index d6c7025151..eb562d9e75 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java @@ -56,6 +56,8 @@ public void testPasswordsFromCredentialProvider() throws Exception { } CredentialProviderToken token = new CredentialProviderToken("root.password", keystorePath); + Assert.assertEquals("root.password", token.getName()); + Assert.assertEquals(keystorePath, token.getCredentialProviders()); Assert.assertArrayEquals("password".getBytes(UTF_8), token.getPassword()); token = new CredentialProviderToken("bob.password", keystorePath); diff --git a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java index e278fceb6a..fdb8cfadb4 100644 --- a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java +++ b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java @@ -28,6 +28,7 @@ import org.apache.accumulo.cluster.ClusterUsers; import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; @@ -251,6 +252,11 @@ public static String getAdminPrincipal() { return clusterConf.getAdminPrincipal(); } + public static ConnectionInfo getConnectionInfo() { + return Connector.builder().forInstance(getCluster().getInstanceName(), getCluster().getZooKeepers()).usingToken(getAdminPrincipal(), getAdminToken()) + .info(); + } + public static AuthenticationToken getAdminToken() { checkState(initialized); return clusterConf.getAdminToken(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java index 999c1e93fd..c243f76ea6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; @@ -172,6 +173,10 @@ protected Connector getConnector() throws AccumuloException, AccumuloSecurityExc return getCluster().getConnector("root", new PasswordToken(ROOT_PASSWORD)); } + protected ConnectionInfo getConnectionInfo() { + return Connector.builder().forInstance(getCluster().getInstanceName(), getCluster().getZooKeepers()).usingPassword("root", ROOT_PASSWORD).info(); + } + protected Process exec(Class<?> clazz, String... args) throws IOException { return getCluster().exec(clazz, args); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java new file mode 100644 index 0000000000..5846ec0f85 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.Properties; + +import org.apache.accumulo.core.client.ConnectionInfo; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.junit.Assert; +import org.junit.Test; + +public class ConnectorIT extends AccumuloClusterHarness { + + @Test + public void testConnectorBuilder() throws Exception { + Connector c = getConnector(); + String instanceName = c.getInstance().getInstanceName(); + String zookeepers = c.getInstance().getZooKeepers(); + final String user = "testuser"; + final String password = "testpassword"; + c.securityOperations().createLocalUser(user, new PasswordToken(password)); + + Connector conn = Connector.builder().forInstance(instanceName, zookeepers).usingPassword(user, password).build(); + + Assert.assertEquals(instanceName, conn.getInstance().getInstanceName()); + Assert.assertEquals(zookeepers, conn.getInstance().getZooKeepers()); + Assert.assertEquals(user, conn.whoami()); + + ConnectionInfo info = Connector.builder().forInstance(instanceName, zookeepers).usingPassword(user, password).info(); + Assert.assertEquals(instanceName, info.getInstanceName()); + Assert.assertEquals(zookeepers, info.getZookeepers()); + Assert.assertEquals(user, info.getPrincipal()); + Assert.assertTrue(info.getAuthenticationToken() instanceof PasswordToken); + + Properties props = new Properties(); + props.put(ClientProperty.INSTANCE_NAME.getKey(), instanceName); + props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers); + props.put(ClientProperty.AUTH_USERNAME.getKey(), user); + props.put(ClientProperty.AUTH_PASSWORD.getKey(), password); + conn = Connector.builder().usingProperties(props).build(); + + Assert.assertEquals(instanceName, conn.getInstance().getInstanceName()); + Assert.assertEquals(zookeepers, conn.getInstance().getZooKeepers()); + Assert.assertEquals(user, conn.whoami()); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java index cf002dd97c..c3047ed409 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java @@ -122,9 +122,8 @@ public int run(String[] args) throws Exception { job.setInputFormat(AccumuloInputFormat.class); - AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table); - AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); AccumuloInputFormat.setBatchScan(job, batchScan); if (sample) { AccumuloInputFormat.setSamplerConfiguration(job, SAMPLER_CONFIG); @@ -215,10 +214,9 @@ public void testCorrectRangeInputSplits() throws Exception { Connector connector = getConnector(); connector.tableOperations().create(table); - AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table); AccumuloInputFormat.setScanAuthorizations(job, auths); - AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); AccumuloInputFormat.setScanIsolation(job, isolated); AccumuloInputFormat.setLocalIterators(job, localIters); AccumuloInputFormat.fetchColumns(job, fetchColumns); diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java index eb12f1cf9e..a2f3918d3a 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java @@ -31,13 +31,12 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -83,9 +82,8 @@ public void testMapred() throws Exception { // set the max memory so that we ensure we don't flush on the write. batchConfig.setMaxMemory(Long.MAX_VALUE); AccumuloOutputFormat outputFormat = new AccumuloOutputFormat(); + AccumuloOutputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloOutputFormat.setBatchWriterOptions(job, batchConfig); - AccumuloOutputFormat.setZooKeeperInstance(job, cluster.getClientConfig()); - AccumuloOutputFormat.setConnectorInfo(job, "root", new PasswordToken(ROOT_PASSWORD)); RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null); try { @@ -122,7 +120,7 @@ public void testMapred() throws Exception { OutputCollector<Text,Mutation> finalOutput; @Override - public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException { + public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) { finalOutput = output; try { if (key != null) @@ -167,11 +165,10 @@ public int run(String[] args) throws Exception { job.setInputFormat(AccumuloInputFormat.class); - ClientConfiguration clientConfig = ClientConfiguration.create().withInstance(instanceName).withZkHosts(zooKeepers); + ConnectionInfo info = Connector.builder().forInstance(instanceName, zooKeepers).usingPassword(user, pass).info(); - AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); + AccumuloInputFormat.setConnectionInfo(job, info); AccumuloInputFormat.setInputTableName(job, table1); - AccumuloInputFormat.setZooKeeperInstance(job, clientConfig); job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Key.class); @@ -180,10 +177,9 @@ public int run(String[] args) throws Exception { job.setOutputKeyClass(Text.class); job.setOutputValueClass(Mutation.class); - AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); + AccumuloOutputFormat.setConnectionInfo(job, info); AccumuloOutputFormat.setCreateTables(job, false); AccumuloOutputFormat.setDefaultTableName(job, table2); - AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig); job.setNumReduceTasks(0); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java index 7d4483338d..2dad20ec08 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java @@ -298,8 +298,7 @@ public int run(String[] args) throws Exception { job.setInputFormatClass(inputFormatClass); - AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig()); - AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table); AccumuloInputFormat.setBatchScan(job, batchScan); if (sample) { @@ -409,9 +408,7 @@ public void testCorrectRangeInputSplits() throws Exception { Connector connector = getConnector(); connector.tableOperations().create(table); - AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig()); - AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); - + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table); AccumuloInputFormat.setScanAuthorizations(job, auths); AccumuloInputFormat.setScanIsolation(job, isolated); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java index ff57722e3d..31dd458a31 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java @@ -94,9 +94,8 @@ public int run(String[] args) throws Exception { job.setInputFormatClass(AccumuloInputFormat.class); - AccumuloInputFormat.setConnectorInfo(job, user, pass); + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table1); - AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Key.class); @@ -105,10 +104,9 @@ public int run(String[] args) throws Exception { job.setOutputKeyClass(Text.class); job.setOutputValueClass(Mutation.class); - AccumuloOutputFormat.setConnectorInfo(job, user, pass); + AccumuloOutputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloOutputFormat.setCreateTables(job, false); AccumuloOutputFormat.setDefaultTableName(job, table2); - AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); job.setNumReduceTasks(0); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services