ilooner commented on a change in pull request #1606: Drill 6845: Semi-Hash-Join 
to skip incoming build duplicates, automatically stop skipping if too few
URL: https://github.com/apache/drill/pull/1606#discussion_r246674633
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java
 ##########
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+  import org.apache.drill.exec.memory.BufferAllocator;
+  import org.apache.drill.exec.record.RecordBatchSizer;
+  import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * This class is currently used only for Semi-Hash-Join that avoids duplicates 
by the use of a hash table
+ * The method {@link 
HashJoinMemoryCalculator.HashJoinSpillControl#shouldSpill(VectorContainer)} 
returns true if the memory available now to the allocator if not enough
+ * to hold (a multiple of, for safety) a new allocated batch
+ */
+public class HashJoinSpillControlImpl implements 
HashJoinMemoryCalculator.HashJoinSpillControl {
+  private BufferAllocator allocator;
+  private int recordsPerBatch;
+  private int minBatchesInAvailableMemory;
+
+  HashJoinSpillControlImpl(BufferAllocator allocator, int recordsPerBatch, int 
minBatchesInAvailableMemory) {
+    this.allocator = allocator;
+    this.recordsPerBatch = recordsPerBatch;
+    this.minBatchesInAvailableMemory = minBatchesInAvailableMemory;
+  }
+
+  @Override
+  public boolean shouldSpill(VectorContainer currentVectorContainer) {
+    assert currentVectorContainer.hasRecordCount();
 
 Review comment:
   This is a heuristic that doesn't capture a lot of the memory requirements we 
need to keep track of like:
   
    - Reserving enough space for the output batch
    - Reserving space for the incoming probe batch. Note if we don't spill the 
incoming probe batch is owned by the upstream operator, but if we are 
processing a spilled partition, the incoming probe batch is owned by the 
HashJoin operator's memory allocator so we need to account for it.
    - Reserving room for the partial batches of each partition.
   
   So I'm concerned we can run into unexpected OOMs. 
   
   However, I think we can account for all this and get a more accurate picture 
of memory usage. The shouldSpill method needs to to compute the memory to 
reserve for the three items above and needs to compute the total memory usage 
of each partition (HashTables + partition batches). Here is a snippet of my 
proposed implementation of should spill.
   
   ```
       public boolean shouldSpill() {
         // Handle early completion conditions
         if (buildPartitionStatSet.allSpilled()) {
           // All build side partitions are spilled so our memory calculation 
is complete
           return false;
         }
   
         long reservedMemory = calculateReservedMemory(
           buildPartitionStatSet.getNumSpilledPartitions(),
           getIncomingProbeBatchReservedSpace(),
           maxOutputBatchSize,
           partitionProbeBatchSize);
   
         // We are consuming our reserved memory plus the amount of memory for 
each build side
         // batch and the size of the hashtables
         consumedMemory = reservedMemory + 
RecordBatchSizer.multiplyByFactor(buildPartitionStatSet.getConsumedMemoryWithHashTable(),
 fragmentationFactor);
         return consumedMemory > memoryAvailable;
   }
   
       public long getIncomingProbeBatchReservedSpace() {
         Preconditions.checkState(initialized);
   
         if (firstCycle) {
           return 0;
         } else {
           return probeSizePredictor.getBatchSize();
         }
       }
   ```
   ### Dependencies
   
   There are two dependencies for the above code to work.  
   
   #### Probe Size Predictor
   
   This batch size predictor will give us an estimate of the size of an 
incoming probe batch so that we can reserve space for it. It can be 
instantiated by using the following snippet.
   
   ```
   BatchSizePredictorImpl.Factory.INSTANCE.create(probeBatch, 
fragmentationFactor, safetyFactor);
   ```
   
   #### PartitionStatSet
   
   The PartitionStatSet is used to get the memory usage of each partition and 
is used by the other memory calculators. Currently the partitionStatSet can 
only give you the size of all of the temporary batches in all the partitions, 
but you will have to enhance it to give the size of all the temporary batches + 
the hash tables in all the partitions.
   
   I suggest adding a `getConsumedMemoryWithHashTable()` method to 
PartitionStatSet. This method can then call a `getInMemorySizeWithHashTable()` 
method on each partition. Note you will have to add this method as well.
   
   The `getInMemorySizeWithHashTable()` can be implemented for a partition like 
the following:
   
   ```
   public getInMemorySizeWithHashTable() {
     getInMemorySize() + hashTable.getActualSize();
   }
   ```
   
   And that's it. I think if we do this we'll have a really accurate estimate 
of memory usage and can avoid OOMs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to