mikewalch closed pull request #548: Added additional client properties for
Scanner and BatchScanner
URL: https://github.com/apache/accumulo/pull/548
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index 2a7ee36e79..c85da315a9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -56,6 +56,27 @@
public abstract BatchScanner createBatchScanner(String tableName,
Authorizations authorizations,
int numQueryThreads) throws TableNotFoundException;
+ /**
+ * Factory method to create a BatchScanner connected to Accumulo. This
method uses the number of
+ * query threads configured when Connector was created. If none were
configured, defaults will be
+ * used.
+ *
+ * @param tableName
+ * the name of the table to query
+ * @param authorizations
+ * A set of authorization labels that will be checked against the
column visibility of
+ * each key in order to filter data. The authorizations passed in
must be a subset of the
+ * accumulo user's set of authorizations. If the accumulo user has
authorizations (A1,
+ * A2) and authorizations (A2, A3) are passed, then an exception
will be thrown.
+ *
+ * @return BatchScanner object for configuring and querying
+ * @throws TableNotFoundException
+ * when the specified table doesn't exist
+ * @since 2.0.0
+ */
+ public abstract BatchScanner createBatchScanner(String tableName,
Authorizations authorizations)
+ throws TableNotFoundException;
+
/**
* Factory method to create a BatchDeleter connected to Accumulo.
*
@@ -165,7 +186,6 @@ public abstract BatchWriter createBatchWriter(String
tableName, long maxMemory,
* @return BatchWriter object for configuring and writing data to
* @since 1.5.0
*/
-
public abstract BatchWriter createBatchWriter(String tableName,
BatchWriterConfig config)
throws TableNotFoundException;
@@ -585,6 +605,16 @@ public abstract Connector changeUser(String principal,
AuthenticationToken token
* @return this builder
*/
ConnectionOptions withBatchWriterConfig(BatchWriterConfig
batchWriterConfig);
+
+ /**
+ * Build with default number of query threads for BatchScanner
+ */
+ ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads);
+
+ /**
+ * Build with default batch size for Scanner
+ */
+ ConnectionOptions withScannerBatchSize(int batchSize);
}
public interface FromOptions extends ConnectionOptions, PropertyOptions,
AuthenticationArgs {
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 244f764092..eac7532652 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -21,6 +21,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -108,6 +109,15 @@ public BatchScanner createBatchScanner(String tableName,
Authorizations authoriz
numQueryThreads);
}
+ @Override
+ public BatchScanner createBatchScanner(String tableName, Authorizations
authorizations)
+ throws TableNotFoundException {
+ Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
+ .getInteger(context.getClientInfo().getProperties());
+ Objects.requireNonNull(numQueryThreads);
+ return createBatchScanner(tableName, authorizations, numQueryThreads);
+ }
+
@Deprecated
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations
authorizations,
@@ -191,7 +201,13 @@ public Scanner createScanner(String tableName,
Authorizations authorizations)
throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
- return new ScannerImpl(context, getTableId(tableName), authorizations);
+ Scanner scanner = new ScannerImpl(context, getTableId(tableName),
authorizations);
+ Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE
+ .getInteger(context.getClientInfo().getProperties());
+ if (batchSize != null) {
+ scanner.setBatchSize(batchSize);
+ }
+ return scanner;
}
@Override
@@ -347,6 +363,18 @@ public ConnectionOptions
withBatchWriterConfig(BatchWriterConfig batchWriterConf
return this;
}
+ @Override
+ public ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads)
{
+ setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS,
numQueryThreads);
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions withScannerBatchSize(int batchSize) {
+ setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
+ return this;
+ }
+
@Override
public SaslOptions withPrimary(String kerberosServerPrimary) {
setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY,
kerberosServerPrimary);
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
index 7b95ba22a5..691c66061f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
@@ -47,6 +47,8 @@ void generate() {
generateSection("Instance", "instance.");
generateSection("Authentication", "auth.", "auth.type",
"auth.principal");
generateSection("Batch Writer", "batch.writer.");
+ generateSection("Batch Scanner", "batch.scanner.");
+ generateSection("Scanner", "scanner.");
generateSection("SSL", "ssl.");
generateSection("SASL", "sasl.");
generateSection("Tracing", "trace.");
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
index 1ec160976c..0b47e7fa47 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
@@ -61,6 +61,14 @@
"Change the" + " durability for the BatchWriter session. To use the
table's durability"
+ " setting. use \"default\" which is the table's durability
setting."),
+ // Scanner
+ SCANNER_BATCH_SIZE("scanner.batch.size", "1000",
+ "Number of key/value pairs that will be fetched at time from tablet
server"),
+
+ // BatchScanner
+ BATCH_SCANNER_NUM_QUERY_THREADS("batch.scanner.num.query.threads", "3",
+ "Number of concurrent query threads to spawn for querying"),
+
// Bulk load
BULK_LOAD_THREADS("bulk.threads", "8C",
"The number of threads used to inspect bulk load files to determine
where files go. "
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services