[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140831#comment-15140831 ]
ASF GitHub Bot commented on FLINK-2237: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52461236 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java --- @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import com.google.common.collect.Ordering; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; +import org.apache.flink.runtime.operators.testutils.types.*; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.fail; + +public class ReduceHashTableTest { + + private static final long RANDOM_SEED = 58723953465322L; + + private static final int PAGE_SIZE = 16 * 1024; + + private final TypeSerializer<Tuple2<Long, String>> serializer; + private final TypeComparator<Tuple2<Long, String>> comparator; + + private final TypeComparator<Long> probeComparator; + + private final TypePairComparator<Long, Tuple2<Long, String>> pairComparator; + + // + // ------------------ Note: This part was mostly copied from CompactingHashTableTest ------------------ + + public ReduceHashTableTest() { + TypeSerializer<?>[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE }; + @SuppressWarnings("unchecked") + Class<Tuple2<Long, String>> clazz = (Class<Tuple2<Long, String>>) (Class<?>) Tuple2.class; + this.serializer = new TupleSerializer<Tuple2<Long, String>>(clazz, fieldSerializers); + + TypeComparator<?>[] comparators = { new LongComparator(true) }; + TypeSerializer<?>[] comparatorSerializers = { LongSerializer.INSTANCE }; + + this.comparator = new TupleComparator<Tuple2<Long, String>>(new int[] {0}, comparators, comparatorSerializers); + + this.probeComparator = new LongComparator(true); + + this.pairComparator = new TypePairComparator<Long, Tuple2<Long, String>>() { + + private long ref; + + @Override + public void setReference(Long reference) { + ref = reference; + } + + @Override + public boolean equalToReference(Tuple2<Long, String> candidate) { + //noinspection UnnecessaryUnboxing + return candidate.f0.longValue() == ref; + } + + @Override + public int compareToReference(Tuple2<Long, String> candidate) { + long x = ref; + long y = candidate.f0; + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }; + } + + @Test + public void testHashTableGrowthWithInsert() { + try { + final int numElements = 1000000; + + List<MemorySegment> memory = getMemory(10000, 32 * 1024); + + // we create a hash table that thinks the records are super large. that makes it choose initially + // a lot of memory for the partition buffers, and start with a smaller hash table. that way + // we trigger a hash table growth early. + ReduceHashTable<Tuple2<Long, String>> table = new ReduceHashTable<Tuple2<Long, String>>( + serializer, comparator, memory, null, null, false); + table.open(); + + for (long i = 0; i < numElements; i++) { + table.insert(new Tuple2<Long, String>(i, String.valueOf(i))); + } + + // make sure that all elements are contained via the entry iterator + { + BitSet bitSet = new BitSet(numElements); + MutableObjectIterator<Tuple2<Long, String>> iter = table.getEntryIterator(); + Tuple2<Long, String> next; + while ((next = iter.next()) != null) { + assertNotNull(next.f0); + assertNotNull(next.f1); + assertEquals(next.f0.longValue(), Long.parseLong(next.f1)); + + bitSet.set(next.f0.intValue()); + } + + assertEquals(numElements, bitSet.cardinality()); + } + + // make sure all entries are contained via the prober + { + ReduceHashTable<Tuple2<Long, String>>.HashTableProber<Long> proper = + table.getProber(probeComparator, pairComparator); + + Tuple2<Long, String> reuse = new Tuple2<>(); + + for (long i = 0; i < numElements; i++) { + assertNotNull(proper.getMatchFor(i, reuse)); + assertNull(proper.getMatchFor(i + numElements, reuse)); + } + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * This test validates that records are not lost via "insertOrReplace()" as in bug [FLINK-2361] + */ + @Test + public void testHashTableGrowthWithInsertOrReplace() { + try { + final int numElements = 1000000; + + List<MemorySegment> memory = getMemory(1000, 32 * 1024); + + // we create a hash table that thinks the records are super large. that makes it choose initially + // a lot of memory for the partition buffers, and start with a smaller hash table. that way + // we trigger a hash table growth early. + ReduceHashTable<Tuple2<Long, String>> table = new ReduceHashTable<Tuple2<Long, String>>( + serializer, comparator, memory, null, null, false); + table.open(); + + for (long i = 0; i < numElements; i++) { + table.insertOrReplaceRecord(new Tuple2<Long, String>(i, String.valueOf(i))); + } + + // make sure that all elements are contained via the entry iterator + { + BitSet bitSet = new BitSet(numElements); + MutableObjectIterator<Tuple2<Long, String>> iter = table.getEntryIterator(); + Tuple2<Long, String> next; + while ((next = iter.next()) != null) { + assertNotNull(next.f0); + assertNotNull(next.f1); + assertEquals(next.f0.longValue(), Long.parseLong(next.f1)); + + bitSet.set(next.f0.intValue()); + } + + assertEquals(numElements, bitSet.cardinality()); + } + + // make sure all entries are contained via the prober + { + ReduceHashTable<Tuple2<Long, String>>.HashTableProber<Long> proper = + table.getProber(probeComparator, pairComparator); + + Tuple2<Long, String> reuse = new Tuple2<>(); + + for (long i = 0; i < numElements; i++) { + assertNotNull(proper.getMatchFor(i, reuse)); + assertNull(proper.getMatchFor(i + numElements, reuse)); + } + } + + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * This test validates that new inserts (rather than updates) in "insertOrReplace()" properly + * react to out of memory conditions. + * + * Btw. this also tests the situation, when records are larger than one memory segment. + */ + @Test + public void testInsertsWithInsertOrReplace() { --- End diff -- This test does not check for correct results in the table. > Add hash-based Aggregation > -------------------------- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature > Reporter: Rafiullah Momand > Assignee: Gabor Gevay > Priority: Minor > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)