[ https://issues.apache.org/jira/browse/FLINK-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15112091#comment-15112091 ]
ASF GitHub Bot commented on FLINK-2871: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1469#discussion_r50511127 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java --- @@ -1558,7 +1641,209 @@ public void reset() { } } // end HashBucketIterator + + /** + * Iterate all the elements in memory which has not been probed during probe phase. + */ + public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> { + + private final TypeSerializer<BT> accessor; + + private final long totalBucketNumber; + + private final int bucketsPerSegmentBits; + + private final int bucketsPerSegmentMask; + + private final MemorySegment[] buckets; + + private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt; + + private final BitSet probedSet; + + private MemorySegment bucket; + + private MemorySegment[] overflowSegments; + + private HashPartition<BT, PT> partition; + + private int scanCount; + + private int bucketInSegmentOffset; + + private int countInSegment; + + private int numInSegment; + + UnmatchedBuildIterator( + TypeSerializer<BT> accessor, + long totalBucketNumber, + int bucketsPerSegmentBits, + int bucketsPerSegmentMask, + MemorySegment[] buckets, + ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt, + BitSet probedSet) { + + this.accessor = accessor; + this.totalBucketNumber = totalBucketNumber; + this.bucketsPerSegmentBits = bucketsPerSegmentBits; + this.bucketsPerSegmentMask = bucketsPerSegmentMask; + this.buckets = buckets; + this.partitionsBeingBuilt = partitionsBeingBuilt; + this.probedSet = probedSet; + init(); + } + + private void init() { + scanCount = -1; + while (!moveToNextBucket()) { + if (scanCount >= totalBucketNumber) { + break; + } + } + } + + public BT next(BT reuse) { + while (true) { + BT result = nextInBucket(reuse); + if (result == null) { + while (!moveToNextBucket()) { + if (scanCount >= totalBucketNumber) { + return null; + } + } + } else { + return result; + } + } + } + + public BT next() { + while (true) { + BT result = nextInBucket(); + if (result == null) { + while (!moveToNextBucket()) { + if (scanCount >= totalBucketNumber) { + return null; + } + } + } else { + return result; + } + } + } + + private boolean moveToNextBucket() { --- End diff -- Sounds good! :-) > Add OuterJoin strategy with HashTable on outer side > --------------------------------------------------- > > Key: FLINK-2871 > URL: https://issues.apache.org/jira/browse/FLINK-2871 > Project: Flink > Issue Type: New Feature > Components: Local Runtime, Optimizer > Affects Versions: 0.10.0 > Reporter: Fabian Hueske > Assignee: Chengxiang Li > Priority: Minor > > Outer joins are currently supported with two local execution strategies: > - sort-merge join > - hash join where the hash table is built on the inner side. Hence, this > strategy is only supported for left and right outer joins. > In order to support hash-tables on the outer side, we need a special hash > table implementation that gives access to all records which have not been > accessed during the probe phase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)