[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301097#comment-15301097 ]
ASF GitHub Bot commented on FLINK-3477: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64669861 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + * an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + * with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + * because before serializing a record, there is no way to know in advance how large will it be. + * Therefore, we can't serialize directly into the record area when we are doing an update, because + * if it turns out to be larger then the old record, then it would override some other record + * that happens to be after the old one in memory. The solution is to serialize to the staging area first, + * and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + * list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + * the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + * + * Note: For hashing, we need to use MathUtils.hash because of its avalanche property, so that + * changing only some high bits of the original value shouldn't leave the lower bits of the hash unaffected. + * This is because when choosing the bucket for a record, we mask only the + * lower bits (see numBucketsMask). Lots of collisions would occur when, for example, + * the original value that is hashed is some bitset, where lots of different values + * that are different only in the higher bits will actually occur. + */ + +public class ReduceHashTable<T> extends AbstractMutableHashTable<T> { + + private static final Logger LOG = LoggerFactory.getLogger(ReduceHashTable.class); + + /** The minimum number of memory segments ReduceHashTable needs to be supplied with in order to work. */ + private static final int MIN_NUM_MEMORY_SEGMENTS = 3; + + // Note: the following two constants can't be negative, because negative values are reserved for storing the + // negated size of the record, when it is abandoned (not part of any linked list). + + /** The last link in the linked lists will have this as next pointer. */ + private static final long END_OF_LIST = Long.MAX_VALUE; + + /** This value means that prevElemPtr is "pointing to the bucket head", and not into the record segments. */ + private static final long INVALID_PREV_POINTER = Long.MAX_VALUE - 1; + + + private static final long RECORD_OFFSET_IN_LINK = 8; + + + /** this is used by processRecordWithReduce */ + private final ReduceFunction<T> reducer; + + /** emit() sends data to outputCollector */ + private final Collector<T> outputCollector; + + private final boolean objectReuseEnabled; + + /** + * This initially contains all the memory we have, and then segments + * are taken from it by bucketSegments, recordArea, and stagingSegments. + */ + private final ArrayList<MemorySegment> freeMemorySegments; + + private final int numAllMemorySegments; + + private final int segmentSize; + + /** + * These will contain the bucket heads. + * The bucket heads are pointers to the linked lists containing the actual records. + */ + private MemorySegment[] bucketSegments; + + private static final int bucketSize = 8, bucketSizeBits = 3; + + private int numBuckets; + private int numBucketsMask; + private final int numBucketsPerSegment, numBucketsPerSegmentBits, numBucketsPerSegmentMask; + + /** + * The segments where the actual data is stored. + */ + private final RecordArea recordArea; + + /** + * Segments for the staging area. + * (It should contain at most one record at all times.) + */ + private final ArrayList<MemorySegment> stagingSegments; + private final RandomAccessInputView stagingSegmentsInView; + private final StagingOutputView stagingSegmentsOutView; + + private T reuse; + + /** This is the internal prober that insertOrReplaceRecord and processRecordWithReduce use. */ + private final HashTableProber<T> prober; + + /** The number of elements currently held by the table. */ + private long numElements = 0; + + /** The number of bytes wasted by updates that couldn't overwrite the old record due to size change. */ + private long holes = 0; + + /** + * If the serializer knows the size of the records, then we can calculate the optimal number of buckets + * upfront, so we don't need resizes. + */ + private boolean enableResize; + + + /** + * This constructor is for the case when will only call those operations that are also + * present on CompactingHashTable. + */ + public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory) { + this(serializer, comparator, memory, null, null, false); + } + + public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory, --- End diff -- Can we remove the `ReduceFunction` and `Collector` from this class and move the corresponding logic into the driver? It would be good if this table could also be used by a `CombineFunction` (not only a `ReduceFunction`). So, I would remove the following methods: - `processRecordWithReduce()`: can be implemented by the driver using the Prober methods `getMatchFor()` and `updateMatch()` - `emit()`: can be implemented by the driver using `getEntryIterator()` - `emitAndReset()`: same as `emit()` but we need an additional `reset()` method I would also rename the table if it becomes less specialized, maybe to `InPlaceMutableHashTable` or do you have a better idea, @ggevay? > Add hash-based combine strategy for ReduceFunction > -------------------------------------------------- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime > Reporter: Fabian Hueske > Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)