[ 
https://issues.apache.org/jira/browse/DRILL-6804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661370#comment-16661370
 ] 

ASF GitHub Bot commented on DRILL-6804:
---------------------------------------

ilooner closed pull request #1508: DRILL-6804: Simplify usage of OperatorPhase 
in HashAgg.
URL: https://github.com/apache/drill/pull/1508
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 80d25edb13a..485d3637277 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -49,7 +50,6 @@
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -192,9 +192,10 @@ public HashAggBatch(HashAggregate popConfig, RecordBatch 
incoming, FragmentConte
     long memAvail = oContext.getAllocator().getLimit();
     long minBatchesPerPartition = 
context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
     long minBatchesNeeded = 2 * minBatchesPerPartition; // 2 - to cover 
overheads, etc.
-    boolean is2ndPhase = popConfig.getAggPhase() == 
AggPrelBase.OperatorPhase.PHASE_2of2;
     boolean fallbackEnabled = 
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
-    if ( is2ndPhase && !fallbackEnabled ) {
+    final AggPrelBase.OperatorPhase phase = popConfig.getAggPhase();
+
+    if ( phase.is2nd() && !fallbackEnabled ) {
       minBatchesNeeded *= 2;  // 2nd phase (w/o fallback) needs at least 2 
partitions
     }
     if ( configuredBatchSize > memAvail / minBatchesNeeded ) { // no cast - 
memAvail may be bigger than max-int
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 6709cf6ddef..32db9eaf487 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -106,9 +106,7 @@
   private int rowsSpilledReturned = 0;
   private int rowsReturnedEarly = 0;
 
-  private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
-  private boolean is2ndPhase = false;
-  private boolean is1stPhase = false;
+  private AggPrelBase.OperatorPhase phase;
   private boolean canSpill = true; // make it false in case can not 
spill/return-early
   private ChainedHashTable baseHashTable;
   private boolean earlyOutput = false; // when 1st phase returns a partition 
due to no memory
@@ -379,11 +377,8 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
     this.outgoing = outgoing;
     this.outContainer = outContainer;
     this.useMemoryPrediction = 
context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
-
-    is2ndPhase = hashAggrConfig.getAggPhase() == 
AggPrelBase.OperatorPhase.PHASE_2of2;
-    isTwoPhase = hashAggrConfig.getAggPhase() != 
AggPrelBase.OperatorPhase.PHASE_1of1;
-    is1stPhase = isTwoPhase && !is2ndPhase;
-    canSpill = isTwoPhase; // single phase can not spill
+    this.phase = hashAggrConfig.getAggPhase();
+    canSpill = phase.hasTwo(); // single phase can not spill
 
     // Typically for testing - force a spill after a partition has more than 
so many batches
     minBatchesPerPartition = 
context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
@@ -447,7 +442,7 @@ private void delayedSetup() {
 
     // Set the number of partitions from the configuration (raise to a power 
of two, if needed)
     int numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
-    if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do early 
return with 1 partition
+    if ( numPartitions == 1 && phase.is2nd() ) { // 1st phase can still do 
early return with 1 partition
       canSpill = false;
       logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
     }
@@ -473,7 +468,7 @@ private void delayedSetup() {
       while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 
1024 * 1024) > memAvail ) {
         numPartitions /= 2;
         if ( numPartitions < 2) {
-          if (is2ndPhase) {
+          if (phase.is2nd()) {
             canSpill = false;  // 2nd phase needs at least 2 to make progress
 
             if (fallbackEnabled) {
@@ -492,7 +487,7 @@ private void delayedSetup() {
         }
       }
     }
-    logger.debug("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+    logger.debug("{} phase. Number of partitions chosen: {}. {} spill", 
phase.getName(),
         numPartitions, canSpill ? "Can" : "Cannot");
 
     // The following initial safety check should be revisited once we can 
lower the number of rows in a batch
@@ -616,7 +611,7 @@ private void updateEstMaxBatchSize(RecordBatch incoming) {
     estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
 
     logger.trace("{} phase. Estimated internal row width: {} Values row width: 
{} batch size: {}  memory limit: {}  max column width: {}",
-        
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
+      
phase.getName(),estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
 
     if ( estMaxBatchSize > allocator.getLimit() ) {
       logger.warn("HashAggregate: Estimated max batch size {} is larger than 
the memory limit {}",estMaxBatchSize,allocator.getLimit());
@@ -886,7 +881,7 @@ public void adjustOutputCount(int outputBatchSize, int 
oldRowWidth, int newRowWi
   @Override
   public void cleanup() {
     if ( schema == null ) { return; } // not set up; nothing to clean
-    if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+    if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
       stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
           (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
     }
@@ -982,7 +977,7 @@ private boolean isSpilled(int part) {
    * @return The partition (number) chosen to be spilled
    */
   private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
-    if ( is1stPhase && ! tryAvoidCurr) { return currPart; } // 1st phase: just 
use the current partition
+    if ( phase.is1st() && ! tryAvoidCurr) { return currPart; } // 1st phase: 
just use the current partition
     int currPartSize = batchHolders[currPart].size();
     if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if 
size is 1
     // first find the largest spilled partition
@@ -1188,7 +1183,7 @@ public AggIterOutcome outputCurrentBatch() {
         if (spilledState.isEmpty()) { // and no spilled partitions
           allFlushed = true;
           this.outcome = IterOutcome.NONE;
-          if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+          if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
             stats.setLongStat(Metric.SPILL_MB, // update stats - total MB 
spilled
                 (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
           }
@@ -1243,7 +1238,7 @@ public AggIterOutcome outputCurrentBatch() {
 
     this.outcome = IterOutcome.OK;
 
-    if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
+    if ( EXTRA_DEBUG_SPILL && phase.is2nd() ) {
       logger.debug("So far returned {} + SpilledReturned {}  total {} (spilled 
{})",rowsNotSpilled,rowsSpilledReturned,
         rowsNotSpilled+rowsSpilledReturned,
         rowsSpilled);
@@ -1322,12 +1317,12 @@ public int numGroupedRecords() {
    */
   private String getOOMErrorMsg(String prefix) {
     String errmsg;
-    if (!isTwoPhase) {
+    if (!phase.hasTwo()) {
       errmsg = "Single Phase Hash Aggregate operator can not spill.";
     } else if (!canSpill) {  // 2nd phase, with only 1 partition
       errmsg = "Too little memory available to operator to facilitate 
spilling.";
     } else { // a bug ?
-      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First 
Phase") + ". Partitions: " + spilledState.getNumPartitions() +
+      errmsg = prefix + " OOM at " + phase.getName() + " Phase. Partitions: " 
+ spilledState.getNumPartitions() +
       ". Estimated batch size: " + estMaxBatchSize + ". values size: " + 
estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
       if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + 
plannedBatches; }
       if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + 
rowsSpilled; }
@@ -1367,11 +1362,11 @@ private void checkGroupAndAggrValues(int 
incomingRowIdx) {
     hashCode >>>= spilledState.getBitsInMask();
     HashTable.PutStatus putStatus = null;
     long allocatedBeforeHTput = allocator.getAllocatedMemory();
+    String tryingTo = phase.is1st() ? "early return" : "spill";
 
     // Proactive spill - in case there is no reserve memory - spill and retry 
putting later
     if ( reserveValueBatchMemory == 0 && canSpill ) {
-      logger.trace("Reserved memory runs short, trying to {} a partition and 
retry Hash Table put() again.",
-        is1stPhase ? "early return" : "spill");
+      logger.trace("Reserved memory runs short, trying to {} a partition and 
retry Hash Table put() again.", tryingTo);
 
       doSpill(currentPartition); // spill to free some memory
 
@@ -1389,8 +1384,7 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
     } catch (RetryAfterSpillException re) {
       if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can 
not spill")); }
 
-      logger.trace("HT put failed with an OOM, trying to {} a partition and 
retry Hash Table put() again.",
-            is1stPhase ? "early return" : "spill");
+      logger.trace("HT put failed with an OOM, trying to {} a partition and 
retry Hash Table put() again.", tryingTo);
 
       // for debugging - in case there's a leak
       long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput;
@@ -1493,7 +1487,7 @@ private void spillIfNeeded(int currentPartition, boolean 
forceSpill) {
 
       // log a detailed debug message explaining why a spill may be needed
       logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to 
add to partition {} with {} batches. " + "Max memory needed {}, Est batch size 
{}, mem limit {}",
-          allocator.getAllocatedMemory(), isTwoPhase ? (is2ndPhase ? "2ND" : 
"1ST") : "Single", currentPartition, batchHolders[currentPartition].size(), 
maxMemoryNeeded,
+          allocator.getAllocatedMemory(), phase.getName(), currentPartition, 
batchHolders[currentPartition].size(), maxMemoryNeeded,
           estMaxBatchSize, allocator.getLimit());
     }
     //
@@ -1516,7 +1510,7 @@ private void spillIfNeeded(int currentPartition, boolean 
forceSpill) {
         return;
       }
 
-      if ( is2ndPhase ) {
+      if ( phase.is2nd() ) {
         long before = allocator.getAllocatedMemory();
 
         spillAPartition(victimPartition);
@@ -1583,7 +1577,7 @@ private void updateStats(HashTable[] htables) {
     this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
     this.stats.setLongStat(Metric.NUM_PARTITIONS, 
spilledState.getNumPartitions());
     this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // 
Put 0 in case no spill
-    if ( is2ndPhase ) {
+    if ( phase.is2nd() ) {
       this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
     }
     if ( rowsReturnedEarly > 0 ) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 84f85ba2f01..f3d527e821b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -47,7 +47,42 @@
 
 public abstract class AggPrelBase extends DrillAggregateRelBase implements 
Prel {
 
-  public enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}
+  public enum OperatorPhase {
+    PHASE_1of1(false, false, false, "Single"),
+    PHASE_1of2(true, true, false, "1st"),
+    PHASE_2of2(true, false, true, "2nd");
+
+    private boolean hasTwo;
+    private boolean is1st;
+    private boolean is2nd;
+    private String name;
+
+    OperatorPhase(boolean hasTwo,
+                  boolean is1st,
+                  boolean is2nd,
+                  String name) {
+      this.hasTwo = hasTwo;
+      this.is1st = is1st;
+      this.is2nd = is2nd;
+      this.name = name;
+    }
+
+    public boolean hasTwo() {
+      return hasTwo;
+    }
+
+    public boolean is1st() {
+      return is1st;
+    }
+
+    public boolean is2nd() {
+      return is2nd;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
 
   protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1; // default 
phase
   protected List<NamedExpression> keys = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q6.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q6.json
index c15539188dc..35b200a5fa9 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q6.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q6.json
@@ -35,6 +35,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
index ef05613677f..3494c793362 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_1.json
@@ -38,6 +38,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
index 62cf5c3587c..f77c08b3d1f 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_2.json
@@ -38,6 +38,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
index 8edc1106929..37ba0ad57f6 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q7_3.json
@@ -38,6 +38,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q8.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q8.json
index a457aa9aec4..f67cd293121 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q8.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q8.json
@@ -37,6 +37,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"
diff --git a/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json 
b/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
index 3461c8ca320..73ce4be7837 100644
--- a/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
+++ b/exec/java-exec/src/test/resources/agg/hashagg/q8_1.json
@@ -35,6 +35,7 @@
     pop : "hash-aggregate",
     @id : 3,
     child : 2,
+    phase : "PHASE_1of1",
     keys : [ {
       ref : "$f0",
       expr : "$f0"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Simplify Usage of OperatorPhase in HashAgg Template
> ---------------------------------------------------
>
>                 Key: DRILL-6804
>                 URL: https://issues.apache.org/jira/browse/DRILL-6804
>             Project: Apache Drill
>          Issue Type: Sub-task
>            Reporter: Timothy Farkas
>            Assignee: Timothy Farkas
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to