Updated Branches:
  refs/heads/ACCUMULO-1566 [created] ff95c7147

ACCUMULO-1566 Pass down the readaheadThreshold parameter from the client to the
server so that the same limit is adhered to by the server in regards to
pipelining.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ff95c714
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ff95c714
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ff95c714

Branch: refs/heads/ACCUMULO-1566
Commit: ff95c7147d210171fef3824eae399b7384cdaff9
Parents: e70a40d
Author: Josh Elser <els...@apache.org>
Authored: Tue Oct 8 18:54:30 2013 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Tue Oct 8 18:54:30 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/ThriftScanner.java         |  15 ++-
 .../thrift/TabletClientService.java             | 124 +++++++++++++++++--
 core/src/main/thrift/tabletserver.thrift        |   3 +-
 .../server/tabletserver/TabletServer.java       |  10 +-
 .../test/performance/thrift/NullTserver.java    |   2 +-
 5 files changed, 132 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index efdd142..efb31e8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
@@ -97,7 +98,7 @@ public class ThriftScanner {
         boolean waitForWrites = 
!serversWaitedForWrites.get(ttype).contains(server);
         InitialScan isr = client.startScan(tinfo, 
scanState.credentials.toThrift(instance), extent.toThrift(), 
scanState.range.toThrift(),
             Translator.translate(scanState.columns, Translator.CT), 
scanState.size, scanState.serverSideIteratorList, 
scanState.serverSideIteratorOptions,
-            scanState.authorizations.getAuthorizationsBB(), waitForWrites, 
scanState.isolated);
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, 
scanState.isolated, scanState.readaheadThreshold);
         if (waitForWrites)
           serversWaitedForWrites.get(ttype).add(server);
         
@@ -132,6 +133,7 @@ public class ThriftScanner {
     Text tableId;
     Text startRow;
     boolean skipStartRow;
+    long readaheadThreshold;
     
     Range range;
     
@@ -150,9 +152,15 @@ public class ThriftScanner {
     List<IterInfo> serverSideIteratorList;
     
     Map<String,Map<String,String>> serverSideIteratorOptions;
-    
+
     public ScanState(Instance instance, Credentials credentials, Text tableId, 
Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns,
         int size, List<IterInfo> serverSideIteratorList, 
Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated) {
+      this(instance, credentials, tableId, authorizations, range, 
fetchedColumns, size, serverSideIteratorList, serverSideIteratorOptions, 
isolated,
+          Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD);
+    }
+
+    public ScanState(Instance instance, Credentials credentials, Text tableId, 
Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns,
+        int size, List<IterInfo> serverSideIteratorList, 
Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, 
long readaheadThreshold) {
       this.instance = instance;
       this.credentials = credentials;
       this.authorizations = authorizations;
@@ -179,6 +187,7 @@ public class ThriftScanner {
       this.serverSideIteratorOptions = serverSideIteratorOptions;
       
       this.isolated = isolated;
+      this.readaheadThreshold = readaheadThreshold;
       
     }
   }
@@ -389,7 +398,7 @@ public class ThriftScanner {
         boolean waitForWrites = 
!serversWaitedForWrites.get(ttype).contains(loc.tablet_location);
         InitialScan is = client.startScan(tinfo, 
scanState.credentials.toThrift(scanState.instance), 
loc.tablet_extent.toThrift(), scanState.range.toThrift(),
             Translator.translate(scanState.columns, Translator.CT), 
scanState.size, scanState.serverSideIteratorList, 
scanState.serverSideIteratorOptions,
-            scanState.authorizations.getAuthorizationsBB(), waitForWrites, 
scanState.isolated);
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, 
scanState.isolated, scanState.readaheadThreshold);
         if (waitForWrites)
           serversWaitedForWrites.get(ttype).add(loc.tablet_location);
         

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
 
b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
index bd6578d..d02b5da 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
 
   public interface Iface extends 
org.apache.accumulo.core.client.impl.thrift.ClientService.Iface {
 
-    public org.apache.accumulo.core.data.thrift.InitialScan 
startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated) throws 
org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, 
NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
+    public org.apache.accumulo.core.data.thrift.InitialScan 
startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, long readaheadThreshold) throws 
org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, 
NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
 
     public org.apache.accumulo.core.data.thrift.ScanResult 
continueScan(org.apache.accumulo.trace.thrift.TInfo tinfo, long scanID) throws 
NoSuchScanIDException, NotServingTabletException, TooManyFilesException, 
org.apache.thrift.TException;
 
@@ -114,7 +114,7 @@ import org.slf4j.LoggerFactory;
 
   public interface AsyncIface extends 
org.apache.accumulo.core.client.impl.thrift.ClientService .AsyncIface {
 
-    public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, 
org.apache.thrift.async.AsyncMethodCallback<AsyncClient.startScan_call> 
resultHandler) throws org.apache.thrift.TException;
+    public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, long readaheadThreshold, 
org.apache.thrift.async.AsyncMethodCallback<AsyncClient.startScan_call> 
resultHandler) throws org.apache.thrift.TException;
 
     public void continueScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
long scanID, 
org.apache.thrift.async.AsyncMethodCallback<AsyncClient.continueScan_call> 
resultHandler) throws org.apache.thrift.TException;
 
@@ -196,13 +196,13 @@ import org.slf4j.LoggerFactory;
       super(iprot, oprot);
     }
 
-    public org.apache.accumulo.core.data.thrift.InitialScan 
startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated) throws 
org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, 
NotServingTabletException, TooManyFilesException, org.apache.thrift.TException
+    public org.apache.accumulo.core.data.thrift.InitialScan 
startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, long readaheadThreshold) throws 
org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, 
NotServingTabletException, TooManyFilesException, org.apache.thrift.TException
     {
-      send_startScan(tinfo, credentials, extent, range, columns, batchSize, 
ssiList, ssio, authorizations, waitForWrites, isolated);
+      send_startScan(tinfo, credentials, extent, range, columns, batchSize, 
ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold);
       return recv_startScan();
     }
 
-    public void send_startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated) throws org.apache.thrift.TException
+    public void send_startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, long readaheadThreshold) throws 
org.apache.thrift.TException
     {
       startScan_args args = new startScan_args();
       args.setTinfo(tinfo);
@@ -216,6 +216,7 @@ import org.slf4j.LoggerFactory;
       args.setAuthorizations(authorizations);
       args.setWaitForWrites(waitForWrites);
       args.setIsolated(isolated);
+      args.setReadaheadThreshold(readaheadThreshold);
       sendBase("startScan", args);
     }
 
@@ -922,9 +923,9 @@ import org.slf4j.LoggerFactory;
       super(protocolFactory, clientManager, transport);
     }
 
-    public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, 
org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler) 
throws org.apache.thrift.TException {
+    public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, long readaheadThreshold, 
org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler) 
throws org.apache.thrift.TException {
       checkReady();
-      startScan_call method_call = new startScan_call(tinfo, credentials, 
extent, range, columns, batchSize, ssiList, ssio, authorizations, 
waitForWrites, isolated, resultHandler, this, ___protocolFactory, ___transport);
+      startScan_call method_call = new startScan_call(tinfo, credentials, 
extent, range, columns, batchSize, ssiList, ssio, authorizations, 
waitForWrites, isolated, readaheadThreshold, resultHandler, this, 
___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
@@ -941,7 +942,8 @@ import org.slf4j.LoggerFactory;
       private List<ByteBuffer> authorizations;
       private boolean waitForWrites;
       private boolean isolated;
-      public startScan_call(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, 
org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport transport) throws 
org.apache.thrift.TException {
+      private long readaheadThreshold;
+      public startScan_call(org.apache.accumulo.trace.thrift.TInfo tinfo, 
org.apache.accumulo.core.security.thrift.TCredentials credentials, 
org.apache.accumulo.core.data.thrift.TKeyExtent extent, 
org.apache.accumulo.core.data.thrift.TRange range, 
List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, 
List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, 
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean 
waitForWrites, boolean isolated, long readaheadThreshold, 
org.apache.thrift.async.AsyncMethodCallback<startScan_call> resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport transport) throws 
org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
         this.tinfo = tinfo;
         this.credentials = credentials;
@@ -954,6 +956,7 @@ import org.slf4j.LoggerFactory;
         this.authorizations = authorizations;
         this.waitForWrites = waitForWrites;
         this.isolated = isolated;
+        this.readaheadThreshold = readaheadThreshold;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws 
org.apache.thrift.TException {
@@ -970,6 +973,7 @@ import org.slf4j.LoggerFactory;
         args.setAuthorizations(authorizations);
         args.setWaitForWrites(waitForWrites);
         args.setIsolated(isolated);
+        args.setReadaheadThreshold(readaheadThreshold);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -2170,7 +2174,7 @@ import org.slf4j.LoggerFactory;
       public startScan_result getResult(I iface, startScan_args args) throws 
org.apache.thrift.TException {
         startScan_result result = new startScan_result();
         try {
-          result.success = iface.startScan(args.tinfo, args.credentials, 
args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, 
args.authorizations, args.waitForWrites, args.isolated);
+          result.success = iface.startScan(args.tinfo, args.credentials, 
args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, 
args.authorizations, args.waitForWrites, args.isolated, 
args.readaheadThreshold);
         } catch 
(org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec) {
           result.sec = sec;
         } catch (NotServingTabletException nste) {
@@ -2846,6 +2850,7 @@ import org.slf4j.LoggerFactory;
     private static final org.apache.thrift.protocol.TField 
AUTHORIZATIONS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("authorizations", 
org.apache.thrift.protocol.TType.LIST, (short)8);
     private static final org.apache.thrift.protocol.TField 
WAIT_FOR_WRITES_FIELD_DESC = new 
org.apache.thrift.protocol.TField("waitForWrites", 
org.apache.thrift.protocol.TType.BOOL, (short)9);
     private static final org.apache.thrift.protocol.TField ISOLATED_FIELD_DESC 
= new org.apache.thrift.protocol.TField("isolated", 
org.apache.thrift.protocol.TType.BOOL, (short)10);
+    private static final org.apache.thrift.protocol.TField 
READAHEAD_THRESHOLD_FIELD_DESC = new 
org.apache.thrift.protocol.TField("readaheadThreshold", 
org.apache.thrift.protocol.TType.I64, (short)12);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes 
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -2864,6 +2869,7 @@ import org.slf4j.LoggerFactory;
     public List<ByteBuffer> authorizations; // required
     public boolean waitForWrites; // required
     public boolean isolated; // required
+    public long readaheadThreshold; // required
 
     /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
     @SuppressWarnings("all") public enum _Fields implements 
org.apache.thrift.TFieldIdEnum {
@@ -2877,7 +2883,8 @@ import org.slf4j.LoggerFactory;
       SSIO((short)7, "ssio"),
       AUTHORIZATIONS((short)8, "authorizations"),
       WAIT_FOR_WRITES((short)9, "waitForWrites"),
-      ISOLATED((short)10, "isolated");
+      ISOLATED((short)10, "isolated"),
+      READAHEAD_THRESHOLD((short)12, "readaheadThreshold");
 
       private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -2914,6 +2921,8 @@ import org.slf4j.LoggerFactory;
             return WAIT_FOR_WRITES;
           case 10: // ISOLATED
             return ISOLATED;
+          case 12: // READAHEAD_THRESHOLD
+            return READAHEAD_THRESHOLD;
           default:
             return null;
         }
@@ -2957,6 +2966,7 @@ import org.slf4j.LoggerFactory;
     private static final int __BATCHSIZE_ISSET_ID = 0;
     private static final int __WAITFORWRITES_ISSET_ID = 1;
     private static final int __ISOLATED_ISSET_ID = 2;
+    private static final int __READAHEADTHRESHOLD_ISSET_ID = 3;
     private byte __isset_bitfield = 0;
     public static final Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
@@ -2990,6 +3000,8 @@ import org.slf4j.LoggerFactory;
           new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
       tmpMap.put(_Fields.ISOLATED, new 
org.apache.thrift.meta_data.FieldMetaData("isolated", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+      tmpMap.put(_Fields.READAHEAD_THRESHOLD, new 
org.apache.thrift.meta_data.FieldMetaData("readaheadThreshold", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startScan_args.class,
 metaDataMap);
     }
@@ -3008,7 +3020,8 @@ import org.slf4j.LoggerFactory;
       Map<String,Map<String,String>> ssio,
       List<ByteBuffer> authorizations,
       boolean waitForWrites,
-      boolean isolated)
+      boolean isolated,
+      long readaheadThreshold)
     {
       this();
       this.tinfo = tinfo;
@@ -3025,6 +3038,8 @@ import org.slf4j.LoggerFactory;
       setWaitForWritesIsSet(true);
       this.isolated = isolated;
       setIsolatedIsSet(true);
+      this.readaheadThreshold = readaheadThreshold;
+      setReadaheadThresholdIsSet(true);
     }
 
     /**
@@ -3096,6 +3111,7 @@ import org.slf4j.LoggerFactory;
       }
       this.waitForWrites = other.waitForWrites;
       this.isolated = other.isolated;
+      this.readaheadThreshold = other.readaheadThreshold;
     }
 
     public startScan_args deepCopy() {
@@ -3118,6 +3134,8 @@ import org.slf4j.LoggerFactory;
       this.waitForWrites = false;
       setIsolatedIsSet(false);
       this.isolated = false;
+      setReadaheadThresholdIsSet(false);
+      this.readaheadThreshold = 0;
     }
 
     public org.apache.accumulo.trace.thrift.TInfo getTinfo() {
@@ -3437,6 +3455,29 @@ import org.slf4j.LoggerFactory;
       __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, 
__ISOLATED_ISSET_ID, value);
     }
 
+    public long getReadaheadThreshold() {
+      return this.readaheadThreshold;
+    }
+
+    public startScan_args setReadaheadThreshold(long readaheadThreshold) {
+      this.readaheadThreshold = readaheadThreshold;
+      setReadaheadThresholdIsSet(true);
+      return this;
+    }
+
+    public void unsetReadaheadThreshold() {
+      __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, 
__READAHEADTHRESHOLD_ISSET_ID);
+    }
+
+    /** Returns true if field readaheadThreshold is set (has been assigned a 
value) and false otherwise */
+    public boolean isSetReadaheadThreshold() {
+      return EncodingUtils.testBit(__isset_bitfield, 
__READAHEADTHRESHOLD_ISSET_ID);
+    }
+
+    public void setReadaheadThresholdIsSet(boolean value) {
+      __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, 
__READAHEADTHRESHOLD_ISSET_ID, value);
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case TINFO:
@@ -3527,6 +3568,14 @@ import org.slf4j.LoggerFactory;
         }
         break;
 
+      case READAHEAD_THRESHOLD:
+        if (value == null) {
+          unsetReadaheadThreshold();
+        } else {
+          setReadaheadThreshold((Long)value);
+        }
+        break;
+
       }
     }
 
@@ -3565,6 +3614,9 @@ import org.slf4j.LoggerFactory;
       case ISOLATED:
         return Boolean.valueOf(isIsolated());
 
+      case READAHEAD_THRESHOLD:
+        return Long.valueOf(getReadaheadThreshold());
+
       }
       throw new IllegalStateException();
     }
@@ -3598,6 +3650,8 @@ import org.slf4j.LoggerFactory;
         return isSetWaitForWrites();
       case ISOLATED:
         return isSetIsolated();
+      case READAHEAD_THRESHOLD:
+        return isSetReadaheadThreshold();
       }
       throw new IllegalStateException();
     }
@@ -3714,6 +3768,15 @@ import org.slf4j.LoggerFactory;
           return false;
       }
 
+      boolean this_present_readaheadThreshold = true;
+      boolean that_present_readaheadThreshold = true;
+      if (this_present_readaheadThreshold || that_present_readaheadThreshold) {
+        if (!(this_present_readaheadThreshold && 
that_present_readaheadThreshold))
+          return false;
+        if (this.readaheadThreshold != that.readaheadThreshold)
+          return false;
+      }
+
       return true;
     }
 
@@ -3840,6 +3903,16 @@ import org.slf4j.LoggerFactory;
           return lastComparison;
         }
       }
+      lastComparison = 
Boolean.valueOf(isSetReadaheadThreshold()).compareTo(typedOther.isSetReadaheadThreshold());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetReadaheadThreshold()) {
+        lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.readaheadThreshold, 
typedOther.readaheadThreshold);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -3935,6 +4008,10 @@ import org.slf4j.LoggerFactory;
       sb.append("isolated:");
       sb.append(this.isolated);
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("readaheadThreshold:");
+      sb.append(this.readaheadThreshold);
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -4140,6 +4217,14 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
               }
               break;
+            case 12: // READAHEAD_THRESHOLD
+              if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+                struct.readaheadThreshold = iprot.readI64();
+                struct.setReadaheadThresholdIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
           }
@@ -4241,6 +4326,9 @@ import org.slf4j.LoggerFactory;
           struct.tinfo.write(oprot);
           oprot.writeFieldEnd();
         }
+        oprot.writeFieldBegin(READAHEAD_THRESHOLD_FIELD_DESC);
+        oprot.writeI64(struct.readaheadThreshold);
+        oprot.writeFieldEnd();
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -4292,7 +4380,10 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetIsolated()) {
           optionals.set(10);
         }
-        oprot.writeBitSet(optionals, 11);
+        if (struct.isSetReadaheadThreshold()) {
+          optionals.set(11);
+        }
+        oprot.writeBitSet(optionals, 12);
         if (struct.isSetTinfo()) {
           struct.tinfo.write(oprot);
         }
@@ -4358,12 +4449,15 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetIsolated()) {
           oprot.writeBool(struct.isolated);
         }
+        if (struct.isSetReadaheadThreshold()) {
+          oprot.writeI64(struct.readaheadThreshold);
+        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, 
startScan_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(11);
+        BitSet incoming = iprot.readBitSet(12);
         if (incoming.get(0)) {
           struct.tinfo = new org.apache.accumulo.trace.thrift.TInfo();
           struct.tinfo.read(iprot);
@@ -4463,6 +4557,10 @@ import org.slf4j.LoggerFactory;
           struct.isolated = iprot.readBool();
           struct.setIsolatedIsSet(true);
         }
+        if (incoming.get(11)) {
+          struct.readaheadThreshold = iprot.readI64();
+          struct.setReadaheadThresholdIsSet(true);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift 
b/core/src/main/thrift/tabletserver.thrift
index 4f9f13a..25e0b10 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -139,7 +139,8 @@ service TabletClientService extends client.ClientService {
                              7:map<string, map<string, string>> ssio,
                              8:list<binary> authorizations
                              9:bool waitForWrites,
-                             10:bool isolated)  throws 
(1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 
3:TooManyFilesException tmfe),
+                             10:bool isolated,
+                             12:i64 readaheadThreshold)  throws 
(1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 
3:TooManyFilesException tmfe),
                              
   data.ScanResult continueScan(2:trace.TInfo tinfo, 1:data.ScanID scanID)  
throws (1:NoSuchScanIDException nssi, 2:NotServingTabletException nste, 
3:TooManyFilesException tmfe),
   oneway void closeScan(2:trace.TInfo tinfo, 1:data.ScanID scanID),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index abb8750..56f03af 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -795,6 +795,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
     public volatile ScanTask<ScanBatch> nextBatchTask;
     public AtomicBoolean interruptFlag;
     public Scanner scanner;
+    public long readaheadThreshold = 
Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
     
     @Override
     public void cleanup() {
@@ -1156,9 +1157,9 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
     
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, 
TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
-        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, 
List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated)
-        throws NotServingTabletException, ThriftSecurityException, 
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-      
+        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, 
List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
+        long readaheadThreshold) throws NotServingTabletException, 
ThriftSecurityException, 
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+
       Authorizations userauths = null;
       if (!security.canScan(credentials, new String(textent.getTable()), 
range, columns, ssiList, ssio, authorizations))
         throw new ThriftSecurityException(credentials.getPrincipal(), 
SecurityErrorCode.PERMISSION_DENIED);
@@ -1195,6 +1196,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       scanSession.ssio = ssio;
       scanSession.auths = new Authorizations(authorizations);
       scanSession.interruptFlag = new AtomicBoolean();
+      scanSession.readaheadThreshold = readaheadThreshold;
       
       for (TColumn tcolumn : columns) {
         scanSession.columnSet.add(new Column(tcolumn));
@@ -1277,7 +1279,7 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
       
       scanSession.batchCount++;
       
-      if (scanResult.more && scanSession.batchCount > 3) {
+      if (scanResult.more && scanSession.batchCount > 
scanSession.readaheadThreshold) {
         // start reading next batch while current batch is transmitted
         // to client
         scanSession.nextBatchTask = new NextBatchTask(scanID, 
scanSession.interruptFlag);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
 
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 9bb7604..f4eb234 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -138,7 +138,7 @@ public class NullTserver {
     
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, 
TKeyExtent extent, TRange range, List<TColumn> columns, int batchSize,
-        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, 
List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) {
+        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, 
List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long 
readaheadThreshold) {
       return null;
     }
     

Reply via email to