[ 
https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15112091#comment-15112091
 ] 

ASF GitHub Bot commented on FLINK-2871:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50511127
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
 ---
    @@ -1558,7 +1641,209 @@ public void reset() {
                }
     
        } // end HashBucketIterator
    +   
    +   /**
    +    * Iterate all the elements in memory which has not been probed during 
probe phase.
    +    */
    +   public static class UnmatchedBuildIterator<BT, PT> implements 
MutableObjectIterator<BT> {
    +   
    +           private final TypeSerializer<BT> accessor;
    +   
    +           private final long totalBucketNumber;
    +           
    +           private final int bucketsPerSegmentBits;
    +           
    +           private final int bucketsPerSegmentMask;
    +           
    +           private final MemorySegment[] buckets;
    +           
    +           private final ArrayList<HashPartition<BT, PT>> 
partitionsBeingBuilt;
    +           
    +           private final BitSet probedSet;
    +           
    +           private MemorySegment bucket;
    +           
    +           private MemorySegment[] overflowSegments;
    +           
    +           private HashPartition<BT, PT> partition;
    +           
    +           private int scanCount;
    +           
    +           private int bucketInSegmentOffset;
    +           
    +           private int countInSegment;
    +           
    +           private int numInSegment;
    +           
    +           UnmatchedBuildIterator(
    +                   TypeSerializer<BT> accessor,
    +                   long totalBucketNumber,
    +                   int bucketsPerSegmentBits,
    +                   int bucketsPerSegmentMask,
    +                   MemorySegment[] buckets,
    +                   ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +                   BitSet probedSet) {
    +                   
    +                   this.accessor = accessor;
    +                   this.totalBucketNumber = totalBucketNumber;
    +                   this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +                   this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +                   this.buckets = buckets;
    +                   this.partitionsBeingBuilt = partitionsBeingBuilt;
    +                   this.probedSet = probedSet;
    +                   init();
    +           }
    +           
    +           private void init() {
    +                   scanCount = -1;
    +                   while (!moveToNextBucket()) {
    +                           if (scanCount >= totalBucketNumber) {
    +                                   break;
    +                           }
    +                   }
    +           }
    +           
    +           public BT next(BT reuse) {
    +                   while (true) {
    +                           BT result = nextInBucket(reuse);
    +                           if (result == null) {
    +                                   while (!moveToNextBucket()) {
    +                                           if (scanCount >= 
totalBucketNumber) {
    +                                                   return null;
    +                                           }
    +                                   }
    +                           } else {
    +                                   return result;
    +                           }
    +                   }
    +           }
    +           
    +           public BT next() {
    +                   while (true) {
    +                           BT result = nextInBucket();
    +                           if (result == null) {
    +                                   while (!moveToNextBucket()) {
    +                                           if (scanCount >= 
totalBucketNumber) {
    +                                                   return null;
    +                                           }
    +                                   }
    +                           } else {
    +                                   return result;
    +                           }
    +                   }
    +           }
    +   
    +           private boolean moveToNextBucket() {
    --- End diff --
    
    Sounds good! :-)


> Add OuterJoin strategy with HashTable on outer side
> ---------------------------------------------------
>
>                 Key: FLINK-2871
>                 URL: https://issues.apache.org/jira/browse/FLINK-2871
>             Project: Flink
>          Issue Type: New Feature
>          Components: Local Runtime, Optimizer
>    Affects Versions: 0.10.0
>            Reporter: Fabian Hueske
>            Assignee: Chengxiang Li
>            Priority: Minor
>
> Outer joins are currently supported with two local execution strategies:
> - sort-merge join
> - hash join where the hash table is built on the inner side. Hence, this 
> strategy is only supported for left and right outer joins.
> In order to support hash-tables on the outer side, we need a special hash 
> table implementation that gives access to all records which have not been 
> accessed during the probe phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to