Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/1469#discussion_r50232198 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java --- @@ -1558,7 +1641,209 @@ public void reset() { } } // end HashBucketIterator + + /** + * Iterate all the elements in memory which has not been probed during probe phase. + */ + public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> { + + private final TypeSerializer<BT> accessor; + + private final long totalBucketNumber; + + private final int bucketsPerSegmentBits; + + private final int bucketsPerSegmentMask; + + private final MemorySegment[] buckets; + + private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt; + + private final BitSet probedSet; + + private MemorySegment bucket; + + private MemorySegment[] overflowSegments; + + private HashPartition<BT, PT> partition; + + private int scanCount; + + private int bucketInSegmentOffset; + + private int countInSegment; + + private int numInSegment; + + UnmatchedBuildIterator( + TypeSerializer<BT> accessor, + long totalBucketNumber, + int bucketsPerSegmentBits, + int bucketsPerSegmentMask, + MemorySegment[] buckets, + ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt, + BitSet probedSet) { + + this.accessor = accessor; + this.totalBucketNumber = totalBucketNumber; + this.bucketsPerSegmentBits = bucketsPerSegmentBits; + this.bucketsPerSegmentMask = bucketsPerSegmentMask; + this.buckets = buckets; + this.partitionsBeingBuilt = partitionsBeingBuilt; + this.probedSet = probedSet; + init(); + } + + private void init() { + scanCount = -1; + while (!moveToNextBucket()) { + if (scanCount >= totalBucketNumber) { + break; + } + } + } + + public BT next(BT reuse) { + while (true) { + BT result = nextInBucket(reuse); + if (result == null) { + while (!moveToNextBucket()) { + if (scanCount >= totalBucketNumber) { + return null; + } + } + } else { + return result; + } + } + } + + public BT next() { + while (true) { + BT result = nextInBucket(); + if (result == null) { + while (!moveToNextBucket()) { + if (scanCount >= totalBucketNumber) { + return null; + } + } + } else { + return result; + } + } + } + + private boolean moveToNextBucket() { --- End diff -- Next bucket may be spilled on disk, so we need a loop here to make sure we move to next on-heap bucket.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---