mikewalch closed pull request #548: Added additional client properties for 
Scanner and BatchScanner
URL: https://github.com/apache/accumulo/pull/548
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java 
b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index 2a7ee36e79..c85da315a9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -56,6 +56,27 @@
   public abstract BatchScanner createBatchScanner(String tableName, 
Authorizations authorizations,
       int numQueryThreads) throws TableNotFoundException;
 
+  /**
+   * Factory method to create a BatchScanner connected to Accumulo. This 
method uses the number of
+   * query threads configured when Connector was created. If none were 
configured, defaults will be
+   * used.
+   *
+   * @param tableName
+   *          the name of the table to query
+   * @param authorizations
+   *          A set of authorization labels that will be checked against the 
column visibility of
+   *          each key in order to filter data. The authorizations passed in 
must be a subset of the
+   *          accumulo user's set of authorizations. If the accumulo user has 
authorizations (A1,
+   *          A2) and authorizations (A2, A3) are passed, then an exception 
will be thrown.
+   *
+   * @return BatchScanner object for configuring and querying
+   * @throws TableNotFoundException
+   *           when the specified table doesn't exist
+   * @since 2.0.0
+   */
+  public abstract BatchScanner createBatchScanner(String tableName, 
Authorizations authorizations)
+      throws TableNotFoundException;
+
   /**
    * Factory method to create a BatchDeleter connected to Accumulo.
    *
@@ -165,7 +186,6 @@ public abstract BatchWriter createBatchWriter(String 
tableName, long maxMemory,
    * @return BatchWriter object for configuring and writing data to
    * @since 1.5.0
    */
-
   public abstract BatchWriter createBatchWriter(String tableName, 
BatchWriterConfig config)
       throws TableNotFoundException;
 
@@ -585,6 +605,16 @@ public abstract Connector changeUser(String principal, 
AuthenticationToken token
      * @return this builder
      */
     ConnectionOptions withBatchWriterConfig(BatchWriterConfig 
batchWriterConfig);
+
+    /**
+     * Build with default number of query threads for BatchScanner
+     */
+    ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads);
+
+    /**
+     * Build with default batch size for Scanner
+     */
+    ConnectionOptions withScannerBatchSize(int batchSize);
   }
 
   public interface FromOptions extends ConnectionOptions, PropertyOptions, 
AuthenticationArgs {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 244f764092..eac7532652 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -21,6 +21,7 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -108,6 +109,15 @@ public BatchScanner createBatchScanner(String tableName, 
Authorizations authoriz
         numQueryThreads);
   }
 
+  @Override
+  public BatchScanner createBatchScanner(String tableName, Authorizations 
authorizations)
+      throws TableNotFoundException {
+    Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
+        .getInteger(context.getClientInfo().getProperties());
+    Objects.requireNonNull(numQueryThreads);
+    return createBatchScanner(tableName, authorizations, numQueryThreads);
+  }
+
   @Deprecated
   @Override
   public BatchDeleter createBatchDeleter(String tableName, Authorizations 
authorizations,
@@ -191,7 +201,13 @@ public Scanner createScanner(String tableName, 
Authorizations authorizations)
       throws TableNotFoundException {
     checkArgument(tableName != null, "tableName is null");
     checkArgument(authorizations != null, "authorizations is null");
-    return new ScannerImpl(context, getTableId(tableName), authorizations);
+    Scanner scanner = new ScannerImpl(context, getTableId(tableName), 
authorizations);
+    Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE
+        .getInteger(context.getClientInfo().getProperties());
+    if (batchSize != null) {
+      scanner.setBatchSize(batchSize);
+    }
+    return scanner;
   }
 
   @Override
@@ -347,6 +363,18 @@ public ConnectionOptions 
withBatchWriterConfig(BatchWriterConfig batchWriterConf
       return this;
     }
 
+    @Override
+    public ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads) 
{
+      setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS, 
numQueryThreads);
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions withScannerBatchSize(int batchSize) {
+      setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
+      return this;
+    }
+
     @Override
     public SaslOptions withPrimary(String kerberosServerPrimary) {
       setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, 
kerberosServerPrimary);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java 
b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
index 7b95ba22a5..691c66061f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
@@ -47,6 +47,8 @@ void generate() {
       generateSection("Instance", "instance.");
       generateSection("Authentication", "auth.", "auth.type", 
"auth.principal");
       generateSection("Batch Writer", "batch.writer.");
+      generateSection("Batch Scanner", "batch.scanner.");
+      generateSection("Scanner", "scanner.");
       generateSection("SSL", "ssl.");
       generateSection("SASL", "sasl.");
       generateSection("Tracing", "trace.");
diff --git 
a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java 
b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
index 1ec160976c..0b47e7fa47 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
@@ -61,6 +61,14 @@
       "Change the" + " durability for the BatchWriter session. To use the 
table's durability"
           + " setting. use \"default\" which is the table's durability 
setting."),
 
+  // Scanner
+  SCANNER_BATCH_SIZE("scanner.batch.size", "1000",
+      "Number of key/value pairs that will be fetched at time from tablet 
server"),
+
+  // BatchScanner
+  BATCH_SCANNER_NUM_QUERY_THREADS("batch.scanner.num.query.threads", "3",
+      "Number of concurrent query threads to spawn for querying"),
+
   // Bulk load
   BULK_LOAD_THREADS("bulk.threads", "8C",
       "The number of threads used to inspect bulk load files to determine 
where files go.  "


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to