[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-25 Thread Ben-Zvi
Github user Ben-Zvi closed the pull request at:

https://github.com/apache/drill/pull/938


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-20 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r140098512
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
--- End diff --

Done


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-20 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r140098546
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
+  // Can be used to attach a debugger, use jstack, etc
+  // The processID of the spinning thread should be in a file like 
/tmp/spin4148663301172491613.tmp
--- End diff --

Done 


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-20 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r140093627
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
+  // Can be used to attach a debugger, use jstack, etc
+  // The processID of the spinning thread should be in a file like 
/tmp/spin4148663301172491613.tmp
+  // along with the error message.
+  File spinFile = new File("/tmp/drillspin");
+  if ( spinFile.exists() ) {
+File tmpDir = new File("/tmp");
+File outErr = null;
+try {
+  outErr = File.createTempFile("spin", ".tmp", tmpDir);
+  BufferedWriter bw = new BufferedWriter(new FileWriter(outErr));
+  bw.write("Spinning process: " + 
ManagementFactory.getRuntimeMXBean().getName()
+  /* After upgrading to JDK 9 - replace with: 
ProcessHandle.current().getPid() */);
+  bw.write("\nError cause: " +
+(errorType == DrillPBError.ErrorType.SYSTEM ? ("SYSTEM ERROR: 
" + ErrorHelper.getRootMessage(cause)) : message));
+  bw.close();
+} catch (Exception ex) {
+  logger.warn("Failed creating a spinner tmp message file: {}", 
ex);
+}
+while (spinFile.exists()) {
+  try { sleep(1_000); } catch (Exception ex) { /* ignore 
interruptions */ }
--- End diff --

Yes - if some non-blocked part tries to kill the query, the spinning parts 
would still be blocked - that may be by design, as debugging still goes on 
(until a user issues "clush -a rm /tmp/drill/spin" )



---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-20 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r140062933
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
+  // Can be used to attach a debugger, use jstack, etc
+  // The processID of the spinning thread should be in a file like 
/tmp/spin4148663301172491613.tmp
+  // along with the error message.
+  File spinFile = new File("/tmp/drillspin");
+  if ( spinFile.exists() ) {
+File tmpDir = new File("/tmp");
+File outErr = null;
+try {
+  outErr = File.createTempFile("spin", ".tmp", tmpDir);
+  BufferedWriter bw = new BufferedWriter(new FileWriter(outErr));
+  bw.write("Spinning process: " + 
ManagementFactory.getRuntimeMXBean().getName()
+  /* After upgrading to JDK 9 - replace with: 
ProcessHandle.current().getPid() */);
+  bw.write("\nError cause: " +
+(errorType == DrillPBError.ErrorType.SYSTEM ? ("SYSTEM ERROR: 
" + ErrorHelper.getRootMessage(cause)) : message));
+  bw.close();
+} catch (Exception ex) {
+  logger.warn("Failed creating a spinner tmp message file: {}", 
ex);
+}
+while (spinFile.exists()) {
+  try { sleep(1_000); } catch (Exception ex) { /* ignore 
interruptions */ }
--- End diff --

 Does query killing cause a user exception ?



---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-20 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r140062742
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
+  // Can be used to attach a debugger, use jstack, etc
+  // The processID of the spinning thread should be in a file like 
/tmp/spin4148663301172491613.tmp
+  // along with the error message.
+  File spinFile = new File("/tmp/drillspin");
--- End diff --

 Using a "flag file" instead of a config setting gives more flexibility; 
like no need to restart in order to turn this feature on/off, or can select to 
catch errors only in few nodes, and last -- can free the looping thread by 
deleting this "flag file". 
  I also plan on posting an announcement on the dev list about this new 
"feature", and see if there's any feedback. 



---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-19 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r139877947
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
+  // Can be used to attach a debugger, use jstack, etc
+  // The processID of the spinning thread should be in a file like 
/tmp/spin4148663301172491613.tmp
+  // along with the error message.
+  File spinFile = new File("/tmp/drillspin");
+  if ( spinFile.exists() ) {
+File tmpDir = new File("/tmp");
+File outErr = null;
+try {
+  outErr = File.createTempFile("spin", ".tmp", tmpDir);
+  BufferedWriter bw = new BufferedWriter(new FileWriter(outErr));
+  bw.write("Spinning process: " + 
ManagementFactory.getRuntimeMXBean().getName()
+  /* After upgrading to JDK 9 - replace with: 
ProcessHandle.current().getPid() */);
+  bw.write("\nError cause: " +
+(errorType == DrillPBError.ErrorType.SYSTEM ? ("SYSTEM ERROR: 
" + ErrorHelper.getRootMessage(cause)) : message));
+  bw.close();
+} catch (Exception ex) {
+  logger.warn("Failed creating a spinner tmp message file: {}", 
ex);
+}
+while (spinFile.exists()) {
+  try { sleep(1_000); } catch (Exception ex) { /* ignore 
interruptions */ }
--- End diff --

What happens it the fragment executor tries to kill the query? Do we want 
the spinner to ignore that request here?


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-19 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r139877079
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
+  // Can be used to attach a debugger, use jstack, etc
+  // The processID of the spinning thread should be in a file like 
/tmp/spin4148663301172491613.tmp
--- End diff --

Would also recommend `/tmp/drill/spin...`.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-19 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r139877726
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
+  // Can be used to attach a debugger, use jstack, etc
+  // The processID of the spinning thread should be in a file like 
/tmp/spin4148663301172491613.tmp
+  // along with the error message.
+  File spinFile = new File("/tmp/drillspin");
--- End diff --

Should this be a config setting? Probably the config is not visible here, 
but can we set a static variable at start-up time? And, since this code will 
check the file system on every exception, should we have a config variable to 
turn on the check?

Feel free to tell me I'm being overly paranoid...


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-19 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r139877011
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
@@ -536,6 +542,33 @@ public Builder pushContext(final String name, final 
double value) {
  * @return user exception
  */
 public UserException build(final Logger logger) {
+
+  // To allow for debugging:
+  // A spinner code to make the execution stop here while the file 
'/tmp/drillspin' exists
--- End diff --

Would recommend `/tmp/drill/spin`. We already use `/tmp/drill` for other 
items, so this keep things tidy.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-14 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r139045903
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
 ---
@@ -47,10 +47,7 @@
   // OK - batch returned, NONE - end of data, RESTART - call again
   public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
 
-  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context,
- OperatorStats stats, OperatorContext 
oContext, RecordBatch incoming, HashAggBatch outgoing,
- LogicalExpression[] valueExprs, 
List valueFieldIds, TypedFieldId[] keyFieldIds,
- VectorContainer outContainer) throws 
SchemaChangeException, IOException, ClassTransformationException;
+  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context, OperatorStats stats, OperatorContext 
oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] 
valueExprs, List valueFieldIds, TypedFieldId[] keyFieldIds, 
VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, 
IOException, ClassTransformationException;
--- End diff --

That was one of the IDE's ideas 
And simplification could be done as part of future cleanup work (like 
DRILL-5779)


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-14 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r139045744
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -1335,7 +1470,7 @@ private void updateStats(HashTable[] htables) {
 }
 if ( rowsReturnedEarly > 0 ) {
   stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB 
returned early
-  (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 
1024.0));
+  (int) Math.round( rowsReturnedEarly * estOutputRowWidth / 
1024.0D / 1024.0));
--- End diff --

Work will be done later as part of DRILL-5779 


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-14 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r139045329
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -545,16 +584,19 @@ public AggOutcome doWork() {
   if (EXTRA_DEBUG_1) {
 logger.debug("Starting outer loop of doWork()...");
   }
-  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+  while (underlyingIndex < currentBatchRecordCount) {
 if (EXTRA_DEBUG_2) {
   logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
 checkGroupAndAggrValues(currentIndex);
+
+if ( retrySameIndex ) { retrySameIndex = false; }  // need to 
retry this row (e.g. we had an OOM)
--- End diff --

So why does "or before" have spaces ? :-)  


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-14 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r139045072
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -109,14 +107,21 @@
 
   private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
   private boolean is2ndPhase = false;
-  private boolean canSpill = true; // make it false in case can not spill
+  private boolean is1stPhase = false;
+  private boolean canSpill = true; // make it false in case can not 
spill/return-early
   private ChainedHashTable baseHashTable;
   private boolean earlyOutput = false; // when 1st phase returns a 
partition due to no memory
   private int earlyPartition = 0; // which partition to return early
-
-  private long memoryLimit; // max memory to be used by this oerator
-  private long estMaxBatchSize = 0; // used for adjusting #partitions
-  private long estRowWidth = 0;
+  private boolean retrySameIndex = false; // in case put failed during 1st 
phase - need to output early, then retry
+  private boolean useMemoryPrediction = false; // whether to use memory 
prediction to decide when to spill
+  private long estMaxBatchSize = 0; // used for adjusting #partitions and 
deciding when to spill
+  private long estRowWidth = 0; // the size of the internal "row" (keys + 
values + extra columns)
+  private long estValuesRowWidth = 0; // the size of the internal values ( 
values + extra )
+  private long estOutputRowWidth = 0; // the size of the output "row" (no 
extra columns)
+  private long estValuesBatchSize = 0; // used for "reserving" memory for 
the Values batch to overcome an OOM
+  private long estOutgoingAllocSize = 0; // used for "reserving" memory 
for the Outgoing Output Values to overcome an OOM
+  private long reserveValueBatchMemory; // keep "reserve memory" for 
Values Batch
+  private long reserveOutgoingMemory; // keep "reserve memory" for the 
Outgoing (Values only) output
--- End diff --

Will wait for some future cleanup opportunity.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-12 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138495914
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 ---
@@ -158,19 +158,17 @@ public BatchHolder(int idx) {
   } finally {
 if (!success) {
   htContainer.clear();
-  if (links != null) {
-links.clear();
-  }
+  if (links != null) { links.clear();}
 }
   }
 }
 
 private void init(IntVector links, IntVector hashValues, int size) {
   for (int i = 0; i < size; i++) {
-links.getMutator().setSafe(i, EMPTY_SLOT);
+links.getMutator().set(i, EMPTY_SLOT);
--- End diff --

This init() method is not used  looks like leftover old code 


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-12 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138495296
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 ---
@@ -58,7 +59,7 @@
 
   public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
 
-  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int 
hashCode) throws SchemaChangeException;
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int 
hashCode) throws SchemaChangeException, RetryAfterSpillException;
--- End diff --

Done.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-12 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138495164
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -1178,20 +1273,38 @@ private void checkGroupAndAggrValues(int 
incomingRowIdx) {
 hashCode >>>= bitsInMask;
 HashTable.PutStatus putStatus = null;
 long allocatedBeforeHTput = allocator.getAllocatedMemory();
-
 // ==
 // Insert the key columns into the hash table
 // ==
+boolean noReserveMem = reserveValueBatchMemory == 0;
 try {
+  if ( noReserveMem && canSpill ) { throw new 
RetryAfterSpillException();} // proactive spill, skip put()
+
   putStatus = htables[currentPartition].put(incomingRowIdx, 
htIdxHolder, hashCode);
+
+} catch (RetryAfterSpillException re) {
+  if ( ! canSpill ) { throw new 
OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
--- End diff --

The method getOOMErrorMsg() does all this explanation ...


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-12 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138494777
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -1178,20 +1273,38 @@ private void checkGroupAndAggrValues(int 
incomingRowIdx) {
 hashCode >>>= bitsInMask;
 HashTable.PutStatus putStatus = null;
 long allocatedBeforeHTput = allocator.getAllocatedMemory();
-
 // ==
 // Insert the key columns into the hash table
 // ==
+boolean noReserveMem = reserveValueBatchMemory == 0;
 try {
+  if ( noReserveMem && canSpill ) { throw new 
RetryAfterSpillException();} // proactive spill, skip put()
+
   putStatus = htables[currentPartition].put(incomingRowIdx, 
htIdxHolder, hashCode);
+
+} catch (RetryAfterSpillException re) {
--- End diff --

Done. 


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-12 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138492663
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -646,6 +687,46 @@ public AggOutcome doWork() {
   }
 
   /**
+   *   Use reserved values memory (if available) to try and preemp an OOM
+   */
+  private void useReservedValuesMemory() {
+// try to preempt an OOM by using the reserved memory
+long reservedMemory = reserveValueBatchMemory;
+if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + 
reservedMemory); }
+
+reserveValueBatchMemory = 0;
+  }
+  /**
+   *   Use reserved outgoing output memory (if available) to try and 
preemp an OOM
+   */
+  private void useReservedOutgoingMemory() {
+// try to preempt an OOM by using the reserved memory
+long reservedMemory = reserveOutgoingMemory;
+if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + 
reservedMemory); }
+
+reserveOutgoingMemory = 0;
+  }
+  /**
+   *  Restore the reserve memory (both)
+   *
+   */
+  private void restoreReservedMemory() {
+if ( 0 == reserveOutgoingMemory ) { // always restore OutputValues 
first (needed for spilling)
+  long memAvail = allocator.getLimit() - 
allocator.getAllocatedMemory();
+  if ( memAvail > estOutgoingAllocSize) {
+allocator.setLimit(allocator.getLimit() - estOutgoingAllocSize);
--- End diff --

Can never add twice, as the code only adds to an empty ( == 0 ) reserve.
And these are not "relative deltas", but the actual expected batch size (so 
that the following allocation would not OOM).



---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-12 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138437442
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -646,6 +687,46 @@ public AggOutcome doWork() {
   }
 
   /**
+   *   Use reserved values memory (if available) to try and preemp an OOM
+   */
+  private void useReservedValuesMemory() {
+// try to preempt an OOM by using the reserved memory
+long reservedMemory = reserveValueBatchMemory;
+if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + 
reservedMemory); }
+
+reserveValueBatchMemory = 0;
+  }
+  /**
+   *   Use reserved outgoing output memory (if available) to try and 
preemp an OOM
+   */
+  private void useReservedOutgoingMemory() {
+// try to preempt an OOM by using the reserved memory
+long reservedMemory = reserveOutgoingMemory;
+if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + 
reservedMemory); }
--- End diff --

   Because the first uncontrolled memory allocation happens when inserting 
into the hash table (i.e. put()).  Given this "uncontrollability", better OOM 
there (which we can handle, by spilling and retrying). Now if all the memory 
was "given" in the limit, the put() may not OOM, but leave too little available 
memory to continue (i.e. to create a values batch, or an outgoing batch) -- 
these situations we can not handle.
   By subtracting from the limit a "reserve" for these two batches, we may 
force a put() OOM early (but that's OK). But we also ensure that the following 
two batches could be allocated. In some way this is similar to having multiple 
dedicated allocators, only simpler.
   Adding or subtracting is just an operation on a local field; no 
performance effect in any way. Also by using a single allocator we can handle 
cases like a "bump" in a batch size (which may exceed pre-allocation in a 
dedicated separate allocator).




---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-12 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138435187
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -382,19 +390,25 @@ private void delayedSetup() {
 final boolean fallbackEnabled = 
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
 
 // Set the number of partitions from the configuration (raise to a 
power of two, if needed)
-numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
-if ( numPartitions == 1 ) {
+numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
+if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do 
early return with 1 partition
   canSpill = false;
   logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
 }
 numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in 
case not a power of 2
 
-if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
+if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = 
estMaxBatchSize = 0; } // incoming was an empty batch
 else {
   // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
   updateEstMaxBatchSize(incoming);
 }
-long memAvail = memoryLimit - allocator.getAllocatedMemory();
+// create "reserved memory" and adjust the memory limit down
+reserveValueBatchMemory = reserveOutgoingMemory = estValuesBatchSize ;
+long newMemoryLimit = allocator.getLimit() - reserveValueBatchMemory - 
reserveOutgoingMemory ;
+long memAvail = newMemoryLimit - allocator.getAllocatedMemory();
+if ( memAvail <= 0 ) { throw new OutOfMemoryException("Too little 
memory available"); }
+allocator.setLimit(newMemoryLimit);
+
--- End diff --

Yes indeed. The only attempt to force a code path is the new testing option 
*use_memory_prediction* which can disable the estimate based prediction (when 
to spill), hence forcing the code path that relies on an OOM (for hash table 
put() ) to take place (one unit test was added for that).
Getting a full code coverage would be ideal, but hard.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-12 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138433967
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming) 
throws SchemaChangeExcepti
*/
   private void updateEstMaxBatchSize(RecordBatch incoming) {
 if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or 
varchar) change
+// Use the sizer to get the input row width and the length of the 
longest varchar column
 RecordBatchSizer sizer = new RecordBatchSizer(incoming);
 logger.trace("Incoming sizer: {}",sizer);
 // An empty batch only has the schema, can not tell actual length of 
varchars
 // else use the actual varchars length, each capped at 50 (to match 
the space allocation)
-estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
-estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
 
 // Get approx max (varchar) column width to get better memory 
allocation
-maxColumnWidth = Math.max(sizer.maxSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
+maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
 maxColumnWidth = Math.min(maxColumnWidth, 
VARIABLE_MAX_WIDTH_VALUE_SIZE);
 
-logger.trace("{} phase. Estimated row width: {}  batch size: {}  
memory limit: {}  max column width: {}",
-
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+//
+// Calculate the estimated max (internal) batch (i.e. Keys batch + 
Values batch) size
+// (which is used to decide when to spill)
+// Also calculate the values batch size (used as a reserve to overcome 
an OOM)
+//
+Iterator outgoingIter = outContainer.iterator();
+int fieldId = 0;
+while (outgoingIter.hasNext()) {
+  ValueVector vv = outgoingIter.next().getValueVector();
+  MaterializedField mr = vv.getField();
+  int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
+  TypeHelper.getSize(mr.getType());
+  estRowWidth += fieldSize;
+  estOutputRowWidth += fieldSize;
+  if ( fieldId < numGroupByOutFields ) { fieldId++; }
+  else { estValuesRowWidth += fieldSize; }
+}
+// multiply by the max number of rows in a batch to get the final 
estimated max size
+estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * 
MAX_BATCH_SIZE;
--- End diff --

Most of these estimates are for internal "worst case".  Only the "outgoing" 
one is for the outgoing batch (which is also for spilling - which is internal).
   Anyway all these estimates have nothing to do with _throttling_ the 
outgoing batch size; that logic was not changed from the original code (likely 
up to MAX_BATCH_SIZE). 
  Making such a change should be a separate project. 



---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-11 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138240616
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -297,10 +302,7 @@ public void outputRecordValues(@Named("htRowIdx") int 
htRowIdx, @Named("outRowId
   }
 
   @Override
-  public void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context,
-OperatorStats stats, OperatorContext oContext, 
RecordBatch incoming, HashAggBatch outgoing,
-LogicalExpression[] valueExprs, List 
valueFieldIds, TypedFieldId[] groupByOutFieldIds,
-VectorContainer outContainer) throws 
SchemaChangeException, IOException {
+  public void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context, OperatorStats stats, OperatorContext 
oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] 
valueExprs, List valueFieldIds, TypedFieldId[] 
groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws 
SchemaChangeException, IOException {
--- End diff --

Removed one argument "stats" - can be taken from the "oContext"


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-11 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138236706
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -109,14 +107,21 @@
 
   private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
   private boolean is2ndPhase = false;
-  private boolean canSpill = true; // make it false in case can not spill
+  private boolean is1stPhase = false;
+  private boolean canSpill = true; // make it false in case can not 
spill/return-early
   private ChainedHashTable baseHashTable;
   private boolean earlyOutput = false; // when 1st phase returns a 
partition due to no memory
   private int earlyPartition = 0; // which partition to return early
-
-  private long memoryLimit; // max memory to be used by this oerator
-  private long estMaxBatchSize = 0; // used for adjusting #partitions
-  private long estRowWidth = 0;
+  private boolean retrySameIndex = false; // in case put failed during 1st 
phase - need to output early, then retry
--- End diff --

This is more for code readability -- "by default, this flag was chosen to 
be false".   


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-11 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138236560
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 ---
@@ -293,7 +299,7 @@ private HashAggregator createAggregatorInternal() 
throws SchemaChangeException,
 aggrExprs,
 cgInner.getWorkspaceTypes(),
 groupByOutFieldIds,
-this.container);
+this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
--- End diff --

Not sure  seemed to work OK in some (limited) testing.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-11 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138236176
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java
 ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.exceptions;
+
+import org.apache.drill.exec.proto.UserBitShared;
+
+/**
+ *  A special exception to be caught by caller, who is supposed to free 
memory by spilling and try again
+ *
+ */
+public class RetryAfterSpillException extends UserException {
--- End diff --

Done


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-11 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r138236250
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
@@ -92,18 +92,20 @@
 
   // Hash Aggregate Options
 
-  String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
   String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
   LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new 
RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
-  String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
   String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
   LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new 
RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
   // min batches is used for tuning (each partition needs so many batches 
when planning the number of partitions,
   // or reserve this number when calculating whether the remaining 
available memory is too small and requires a spill.)
   // Low value may OOM (e.g., when incoming rows become wider), higher 
values use fewer partitions but are safer
-  String HASHAGG_MIN_BATCHES_PER_PARTITION = 
"drill.exec.hashagg.min_batches_per_partition";
-  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = 
"drill.exec.hashagg.min_batches_per_partition";
-  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new 
RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5);
+  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = 
"exec.hashagg.min_batches_per_partition";
+  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new 
RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 1, 5);
+  // Can be turns off mainly for testing. Memory prediction is used to 
decide on when to spill to disk; with this option off,
--- End diff --

Done


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939203
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -109,14 +107,21 @@
 
   private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
   private boolean is2ndPhase = false;
-  private boolean canSpill = true; // make it false in case can not spill
+  private boolean is1stPhase = false;
+  private boolean canSpill = true; // make it false in case can not 
spill/return-early
   private ChainedHashTable baseHashTable;
   private boolean earlyOutput = false; // when 1st phase returns a 
partition due to no memory
   private int earlyPartition = 0; // which partition to return early
-
-  private long memoryLimit; // max memory to be used by this oerator
-  private long estMaxBatchSize = 0; // used for adjusting #partitions
-  private long estRowWidth = 0;
+  private boolean retrySameIndex = false; // in case put failed during 1st 
phase - need to output early, then retry
+  private boolean useMemoryPrediction = false; // whether to use memory 
prediction to decide when to spill
+  private long estMaxBatchSize = 0; // used for adjusting #partitions and 
deciding when to spill
+  private long estRowWidth = 0; // the size of the internal "row" (keys + 
values + extra columns)
+  private long estValuesRowWidth = 0; // the size of the internal values ( 
values + extra )
+  private long estOutputRowWidth = 0; // the size of the output "row" (no 
extra columns)
+  private long estValuesBatchSize = 0; // used for "reserving" memory for 
the Values batch to overcome an OOM
+  private long estOutgoingAllocSize = 0; // used for "reserving" memory 
for the Outgoing Output Values to overcome an OOM
+  private long reserveValueBatchMemory; // keep "reserve memory" for 
Values Batch
+  private long reserveOutgoingMemory; // keep "reserve memory" for the 
Outgoing (Values only) output
--- End diff --

Long lists of member variables are generally frowned upon. Can't unit test 
them. Too many states to keep in mind. Can these be grouped into a read-only 
config class (set up front, then never changed) vs, running estimates?


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939168
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 ---
@@ -293,7 +299,7 @@ private HashAggregator createAggregatorInternal() 
throws SchemaChangeException,
 aggrExprs,
 cgInner.getWorkspaceTypes(),
 groupByOutFieldIds,
-this.container);
+this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
--- End diff --

If the `BigInt` column is used to indicate nulls, then each value is of 
size 9. And, since, on average, each vector has 25% internal fragmentation. To 
account for this, perhaps assume that the average size is 12 or 13 bytes.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939287
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming) 
throws SchemaChangeExcepti
*/
   private void updateEstMaxBatchSize(RecordBatch incoming) {
 if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or 
varchar) change
+// Use the sizer to get the input row width and the length of the 
longest varchar column
 RecordBatchSizer sizer = new RecordBatchSizer(incoming);
 logger.trace("Incoming sizer: {}",sizer);
 // An empty batch only has the schema, can not tell actual length of 
varchars
 // else use the actual varchars length, each capped at 50 (to match 
the space allocation)
-estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
-estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
 
 // Get approx max (varchar) column width to get better memory 
allocation
-maxColumnWidth = Math.max(sizer.maxSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
+maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
 maxColumnWidth = Math.min(maxColumnWidth, 
VARIABLE_MAX_WIDTH_VALUE_SIZE);
 
-logger.trace("{} phase. Estimated row width: {}  batch size: {}  
memory limit: {}  max column width: {}",
-
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+//
+// Calculate the estimated max (internal) batch (i.e. Keys batch + 
Values batch) size
+// (which is used to decide when to spill)
+// Also calculate the values batch size (used as a reserve to overcome 
an OOM)
+//
+Iterator outgoingIter = outContainer.iterator();
+int fieldId = 0;
+while (outgoingIter.hasNext()) {
+  ValueVector vv = outgoingIter.next().getValueVector();
+  MaterializedField mr = vv.getField();
+  int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
+  TypeHelper.getSize(mr.getType());
+  estRowWidth += fieldSize;
+  estOutputRowWidth += fieldSize;
+  if ( fieldId < numGroupByOutFields ) { fieldId++; }
+  else { estValuesRowWidth += fieldSize; }
+}
+// multiply by the max number of rows in a batch to get the final 
estimated max size
+estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * 
MAX_BATCH_SIZE;
--- End diff --

Here, the output batch size is fixed based on the number of rows. Suppose 
we had a sort as the output of this operator, and the sort has a memory ceiling 
of x MB. Can the code here create batches larger than x/2 MB, meaning that that 
sort is forced to consume batches so large that it can't buffer two and spill?

In other words, is there an attempt here to control overall output batch 
memory use instead of just assuming that we always output `MAX_BATCH_SIZE` rows 
regardless of memory use?


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939296
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -545,16 +584,19 @@ public AggOutcome doWork() {
   if (EXTRA_DEBUG_1) {
 logger.debug("Starting outer loop of doWork()...");
   }
-  for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+  while (underlyingIndex < currentBatchRecordCount) {
 if (EXTRA_DEBUG_2) {
   logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
 }
 checkGroupAndAggrValues(currentIndex);
+
+if ( retrySameIndex ) { retrySameIndex = false; }  // need to 
retry this row (e.g. we had an OOM)
--- End diff --

I think Drill's coding style guidelines says no spaces after ( or before ).


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939122
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
@@ -92,18 +92,20 @@
 
   // Hash Aggregate Options
 
-  String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
   String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
   LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new 
RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
-  String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
   String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
   LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new 
RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
   // min batches is used for tuning (each partition needs so many batches 
when planning the number of partitions,
   // or reserve this number when calculating whether the remaining 
available memory is too small and requires a spill.)
   // Low value may OOM (e.g., when incoming rows become wider), higher 
values use fewer partitions but are safer
-  String HASHAGG_MIN_BATCHES_PER_PARTITION = 
"drill.exec.hashagg.min_batches_per_partition";
-  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = 
"drill.exec.hashagg.min_batches_per_partition";
-  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new 
RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5);
+  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = 
"exec.hashagg.min_batches_per_partition";
+  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new 
RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 1, 5);
+  // Can be turns off mainly for testing. Memory prediction is used to 
decide on when to spill to disk; with this option off,
--- End diff --

turns --> turned


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939184
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -109,14 +107,21 @@
 
   private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
   private boolean is2ndPhase = false;
-  private boolean canSpill = true; // make it false in case can not spill
+  private boolean is1stPhase = false;
+  private boolean canSpill = true; // make it false in case can not 
spill/return-early
   private ChainedHashTable baseHashTable;
   private boolean earlyOutput = false; // when 1st phase returns a 
partition due to no memory
   private int earlyPartition = 0; // which partition to return early
-
-  private long memoryLimit; // max memory to be used by this oerator
-  private long estMaxBatchSize = 0; // used for adjusting #partitions
-  private long estRowWidth = 0;
+  private boolean retrySameIndex = false; // in case put failed during 1st 
phase - need to output early, then retry
--- End diff --

As it turns out, unlike C++, Java is pretty good at initializing booleans 
to false and longs to 0. We only need to explicitly initialize values when the 
value should be other than 0/false/null.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939496
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 ---
@@ -58,7 +59,7 @@
 
   public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
 
-  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int 
hashCode) throws SchemaChangeException;
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int 
hashCode) throws SchemaChangeException, RetryAfterSpillException;
--- End diff --

At present, `RetryAfterSpillException` is unchecked, so it is not necessary 
to declare. But, change `RetryAfterSpillException` to extend `Exception` 
(checked) and this declaration then becomes useful.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939361
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -1335,7 +1470,7 @@ private void updateStats(HashTable[] htables) {
 }
 if ( rowsReturnedEarly > 0 ) {
   stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB 
returned early
-  (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 
1024.0));
+  (int) Math.round( rowsReturnedEarly * estOutputRowWidth / 
1024.0D / 1024.0));
--- End diff --

This file is a template. This means, we copy *all* this code each time we 
generate a new class. How is doing so helping stability, customer value or 
performance? Should all this code be in a template that is copied on every 
query? Or, should it be refactored into a driver class, with only a very light 
wrapper appearing in the copied template?

As this code get ever more complex, it puts a strain on the Java code that 
must walk though this code and do method fixup, scalar replacements, etc. That 
work takes time. What value accrues to the user from doing this fixup on code 
that never changes  from one query to the next?

Filed [DRILL-5779](https://issues.apache.org/jira/browse/DRILL-5779) for 
this issue.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939459
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -1178,20 +1273,38 @@ private void checkGroupAndAggrValues(int 
incomingRowIdx) {
 hashCode >>>= bitsInMask;
 HashTable.PutStatus putStatus = null;
 long allocatedBeforeHTput = allocator.getAllocatedMemory();
-
 // ==
 // Insert the key columns into the hash table
 // ==
+boolean noReserveMem = reserveValueBatchMemory == 0;
 try {
+  if ( noReserveMem && canSpill ) { throw new 
RetryAfterSpillException();} // proactive spill, skip put()
+
   putStatus = htables[currentPartition].put(incomingRowIdx, 
htIdxHolder, hashCode);
+
+} catch (RetryAfterSpillException re) {
+  if ( ! canSpill ) { throw new 
OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
--- End diff --

This is the message sent to the log and user. Should we explain why we 
can't spill? And, what to do? Something like:

"Incoming batch too large and no in-memory partitions to spill. Increase 
memory assigned to the Hash Agg."

Replace the above wording with the actual reasons and fixes.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939319
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -646,6 +687,46 @@ public AggOutcome doWork() {
   }
 
   /**
+   *   Use reserved values memory (if available) to try and preemp an OOM
+   */
+  private void useReservedValuesMemory() {
+// try to preempt an OOM by using the reserved memory
+long reservedMemory = reserveValueBatchMemory;
+if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + 
reservedMemory); }
+
+reserveValueBatchMemory = 0;
+  }
+  /**
+   *   Use reserved outgoing output memory (if available) to try and 
preemp an OOM
+   */
+  private void useReservedOutgoingMemory() {
+// try to preempt an OOM by using the reserved memory
+long reservedMemory = reserveOutgoingMemory;
+if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + 
reservedMemory); }
--- End diff --

Why is it necessary to change the allocator limit? The allocator limit 
should be fixed: it is the amount of memory given to this operator. Shouldn't 
the code use its own, internal, limits to make decisions? That is, if allocated 
memory + some expected use > a defined internal size, then spill?


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939481
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
 ---
@@ -47,10 +47,7 @@
   // OK - batch returned, NONE - end of data, RESTART - call again
   public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
 
-  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context,
- OperatorStats stats, OperatorContext 
oContext, RecordBatch incoming, HashAggBatch outgoing,
- LogicalExpression[] valueExprs, 
List valueFieldIds, TypedFieldId[] keyFieldIds,
- VectorContainer outContainer) throws 
SchemaChangeException, IOException, ClassTransformationException;
+  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context, OperatorStats stats, OperatorContext 
oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] 
valueExprs, List valueFieldIds, TypedFieldId[] keyFieldIds, 
VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, 
IOException, ClassTransformationException;
--- End diff --

Not sure that putting all items on one big line is an improvement over the 
arg-per-line format previously.

Also, see note above: a large number of arguments suggest a muddy design 
with one class trying to do far too much.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939223
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -297,10 +302,7 @@ public void outputRecordValues(@Named("htRowIdx") int 
htRowIdx, @Named("outRowId
   }
 
   @Override
-  public void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context,
-OperatorStats stats, OperatorContext oContext, 
RecordBatch incoming, HashAggBatch outgoing,
-LogicalExpression[] valueExprs, List 
valueFieldIds, TypedFieldId[] groupByOutFieldIds,
-VectorContainer outContainer) throws 
SchemaChangeException, IOException {
+  public void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context, OperatorStats stats, OperatorContext 
oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] 
valueExprs, List valueFieldIds, TypedFieldId[] 
groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws 
SchemaChangeException, IOException {
--- End diff --

Methods and constructors with many arguments are generally frowned upon as 
it suggests that a single class is trying to do too much: it is has too much 
internal coupling, performing tasks that should be broken apart. Can this 
single, huge, class be split into smaller, more focused, abstractions?


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939116
  
--- Diff: 
common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java
 ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.exceptions;
+
+import org.apache.drill.exec.proto.UserBitShared;
+
+/**
+ *  A special exception to be caught by caller, who is supposed to free 
memory by spilling and try again
+ *
+ */
+public class RetryAfterSpillException extends UserException {
--- End diff --

If this exception is thrown and caught internally, it should not extend 
`UserException`. Instead it should extend the Java `RuntimeException`.

Better, since you know you must catch this, this should be a checked 
exception, extended from `Exception` and declared by the method that throws it.

`UserException` is purely for exceptions reported to the user.


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939427
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -1178,20 +1273,38 @@ private void checkGroupAndAggrValues(int 
incomingRowIdx) {
 hashCode >>>= bitsInMask;
 HashTable.PutStatus putStatus = null;
 long allocatedBeforeHTput = allocator.getAllocatedMemory();
-
 // ==
 // Insert the key columns into the hash table
 // ==
+boolean noReserveMem = reserveValueBatchMemory == 0;
 try {
+  if ( noReserveMem && canSpill ) { throw new 
RetryAfterSpillException();} // proactive spill, skip put()
+
   putStatus = htables[currentPartition].put(incomingRowIdx, 
htIdxHolder, hashCode);
+
+} catch (RetryAfterSpillException re) {
--- End diff --

See above. Should be a checked exception declared by `put()`.

Also, why do we need to throw an exception before calling `put` only to 
catch it a couple of lines later?

If the spill code was in a method, rather than just inline, seems we could 
do:

```
if ( noReserveMem && canSpill ) { doSpill(); }
try {
  ...put(...)
} catch (...) {
   doSpill();
}
```


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939532
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 ---
@@ -158,19 +158,17 @@ public BatchHolder(int idx) {
   } finally {
 if (!success) {
   htContainer.clear();
-  if (links != null) {
-links.clear();
-  }
+  if (links != null) { links.clear();}
 }
   }
 }
 
 private void init(IntVector links, IntVector hashValues, int size) {
   for (int i = 0; i < size; i++) {
-links.getMutator().setSafe(i, EMPTY_SLOT);
+links.getMutator().set(i, EMPTY_SLOT);
--- End diff --

Is size ever less than the vector capacity()? Else, you can just ask the 
vector for its capacity.

The `links.getMutator()` call in an inner loop is inefficient.

Instead of a single function initializing two `IntVector`s with redundant 
code, can this be refactored to have a function that initializes one vector, 
that is called twice?


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/938#discussion_r137939253
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
@@ -382,19 +390,25 @@ private void delayedSetup() {
 final boolean fallbackEnabled = 
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
 
 // Set the number of partitions from the configuration (raise to a 
power of two, if needed)
-numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
-if ( numPartitions == 1 ) {
+numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
+if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do 
early return with 1 partition
   canSpill = false;
   logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
 }
 numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in 
case not a power of 2
 
-if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
+if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = 
estMaxBatchSize = 0; } // incoming was an empty batch
 else {
   // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
   updateEstMaxBatchSize(incoming);
 }
-long memAvail = memoryLimit - allocator.getAllocatedMemory();
+// create "reserved memory" and adjust the memory limit down
+reserveValueBatchMemory = reserveOutgoingMemory = estValuesBatchSize ;
+long newMemoryLimit = allocator.getLimit() - reserveValueBatchMemory - 
reserveOutgoingMemory ;
+long memAvail = newMemoryLimit - allocator.getAllocatedMemory();
+if ( memAvail <= 0 ) { throw new OutOfMemoryException("Too little 
memory available"); }
+allocator.setLimit(newMemoryLimit);
+
--- End diff --

This code has grown to be incredibly complex with many, many paths through 
the various functions.

Tests are handy things. Do we have system-level unit tests that exercise 
each path through the code? Otherwise, as a reviewer, how can I be sure that 
each execution path does, in fact, work?


---


[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

2017-09-08 Thread Ben-Zvi
GitHub user Ben-Zvi opened a pull request:

https://github.com/apache/drill/pull/938

DRILL-5694: Handle HashAgg OOM by spill and retry, plus perf improvement

  The main change in this PR is adding a "_second way_" to handle memory 
pressure for the Hash Aggregate: Basically catch OOM failures when processing a 
new input row (during put() into the Hash Table), cleanup internally to allow a 
retry (of the put()) and return a new exception "**RetryAfterSpillException**". 
In such a case the caller spills some partition to free more memory, and 
retries inserting that new row.
   In addition, to reduce the risk of OOM when either creating the "Values 
Batch" (to match the "Keys Batch" in the Hash Table), or when allocating the 
Outgoing vectors (for the Values) -- there are new "_reserves_" -- one reserve 
for each of the two. A "_reserve_" is a memory amount subtracted from the 
memory-limit, which is added back to the limit just before it is needed, so 
hopefully preventing an OOM. After the allocation the code tries to restore 
that reserve (by subtracting from the limit, if possible). We always restore 
the "Outgoing Reserve" first; in case the "Values Batch" reserve runs empty 
just before calling put(), we skip the put() (just like an OOM there) and spill 
to free some memory (and restore that reserve).
   The old "_first way_" is still used. That is the code that predicts the 
memory needs, and triggers a spill if not enough memory is available. The spill 
code was separated into a new method called spillIfNeeded() which is used in 
two modes - either the old way (prediction), or (when called from the new OOM 
catch code) with a flag to force a spill, regardless of available memory. That 
flag is also used to reduce the priority of the "current partition" when 
choosing a partition to spill.

  A new testing option was added (**hashagg_use_memory_prediction**, 
default true) - by setting this to false the old "first way" is disabled. This 
allows stress testing of the OOM handling code (which may not be used under 
normal memory allocation).

  The HashTable put() code was re-written to cleanup partial changes in 
case of an OOM. And so the code around the call of put() to catch the new 
exception, spill and retry. Note that this works for 1st phase aggregation as 
well (return rows early).

For the estimates (in addition to the old "max batch size" estimate) - 
there is an estimate for the Values Batch, and one for for the Outgoing. These 
are used for restoring the "reserves". These estimates may be resized up in 
case actual allocations are bigger.

Other changes:
* Improved the "max batch size estimation" -- using the outgoing batch for 
getting the correct schema (instead of the input batch).
  The only information needed from the input batch is the "max average 
column size" (see change inRecordBatchSizer.java) to have a better estimate for 
VarChars.
  Also computed the size of those "no null" bigint columns added into the 
Values Batch when the aggregation is SUM, MIN or MAX (see changes in 
HashAggBatch.java and HashAggregator.java)
* Using a "plain Java" subclass for the HashTable  because "byte 
manipulation" breaks on the new template code (see ChainedHashTable.java)
* The three Configuration options where changed into System/Session 
options:   min_batches_per_partition , hashagg_max_memory , 
hashagg_num_partitions
* There was a potential memory leak in the HashTable BatchHolder ctor 
(vectors were added to the container only after the successful allocation, and 
the container was cleared in case of OOM. So in case of a partial allocation, 
the allocated part was no accessible). Also (Paul's suggestion) modified some 
vector templates to cleanup after any runtime error (including an OOM).
* Performance improvements: Eliminated the call to updateBatches() before 
each hash computation (instead used only when switching to a new 
SpilledRecordBatch); this was a big overhead.
   Also changed all the "setSafe" calls into "set" for the HashTable (those 
nanoseconds add up, specially when rehashing) - these bigint vectors need no 
resizing.
* Ignore "(spill) file not found" error while cleaning up.
* The unit tests were re-written in a more compact form. And a test with 
the new option (forcing the OOM code) was added (no memory prediction).


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ben-Zvi/drill DRILL-5694

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/938.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #938


commit 1a96bb39faf01b7665bd669d88494789693ed9b8
Author: Ben-Zvi 
Date:   2017-09-08T22:52:57Z

DRILL-5694: Handle OOM in HashAggr by spill