[ 
https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140831#comment-15140831
 ] 

ASF GitHub Bot commented on FLINK-2237:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1517#discussion_r52461236
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReduceHashTableTest.java
 ---
    @@ -0,0 +1,501 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.operators.hash;
    +
    +import com.google.common.collect.Ordering;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.util.CopyingListCollector;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypePairComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongComparator;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.core.memory.MemorySegmentFactory;
    +import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
    +import org.apache.flink.runtime.operators.testutils.types.*;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.MutableObjectIterator;
    +import org.junit.Test;
    +
    +import java.util.*;
    +
    +import static org.junit.Assert.*;
    +import static org.junit.Assert.fail;
    +
    +public class ReduceHashTableTest {
    +
    +   private static final long RANDOM_SEED = 58723953465322L;
    +
    +   private static final int PAGE_SIZE = 16 * 1024;
    +
    +   private final TypeSerializer<Tuple2<Long, String>> serializer;
    +   private final TypeComparator<Tuple2<Long, String>> comparator;
    +
    +   private final TypeComparator<Long> probeComparator;
    +
    +   private final TypePairComparator<Long, Tuple2<Long, String>> 
pairComparator;
    +
    +   //
    +   // ------------------ Note: This part was mostly copied from 
CompactingHashTableTest ------------------
    +
    +   public ReduceHashTableTest() {
    +           TypeSerializer<?>[] fieldSerializers = { 
LongSerializer.INSTANCE, StringSerializer.INSTANCE };
    +           @SuppressWarnings("unchecked")
    +           Class<Tuple2<Long, String>> clazz = (Class<Tuple2<Long, 
String>>) (Class<?>) Tuple2.class;
    +           this.serializer = new TupleSerializer<Tuple2<Long, 
String>>(clazz, fieldSerializers);
    +
    +           TypeComparator<?>[] comparators = { new LongComparator(true) };
    +           TypeSerializer<?>[] comparatorSerializers = { 
LongSerializer.INSTANCE };
    +
    +           this.comparator = new TupleComparator<Tuple2<Long, String>>(new 
int[] {0}, comparators, comparatorSerializers);
    +
    +           this.probeComparator = new LongComparator(true);
    +
    +           this.pairComparator = new TypePairComparator<Long, Tuple2<Long, 
String>>() {
    +
    +                   private long ref;
    +
    +                   @Override
    +                   public void setReference(Long reference) {
    +                           ref = reference;
    +                   }
    +
    +                   @Override
    +                   public boolean equalToReference(Tuple2<Long, String> 
candidate) {
    +                           //noinspection UnnecessaryUnboxing
    +                           return candidate.f0.longValue() == ref;
    +                   }
    +
    +                   @Override
    +                   public int compareToReference(Tuple2<Long, String> 
candidate) {
    +                           long x = ref;
    +                           long y = candidate.f0;
    +                           return (x < y) ? -1 : ((x == y) ? 0 : 1);
    +                   }
    +           };
    +   }
    +
    +   @Test
    +   public void testHashTableGrowthWithInsert() {
    +           try {
    +                   final int numElements = 1000000;
    +
    +                   List<MemorySegment> memory = getMemory(10000, 32 * 
1024);
    +
    +                   // we create a hash table that thinks the records are 
super large. that makes it choose initially
    +                   // a lot of memory for the partition buffers, and start 
with a smaller hash table. that way
    +                   // we trigger a hash table growth early.
    +                   ReduceHashTable<Tuple2<Long, String>> table = new 
ReduceHashTable<Tuple2<Long, String>>(
    +                           serializer, comparator, memory, null, null, 
false);
    +                   table.open();
    +
    +                   for (long i = 0; i < numElements; i++) {
    +                           table.insert(new Tuple2<Long, String>(i, 
String.valueOf(i)));
    +                   }
    +
    +                   // make sure that all elements are contained via the 
entry iterator
    +                   {
    +                           BitSet bitSet = new BitSet(numElements);
    +                           MutableObjectIterator<Tuple2<Long, String>> 
iter = table.getEntryIterator();
    +                           Tuple2<Long, String> next;
    +                           while ((next = iter.next()) != null) {
    +                                   assertNotNull(next.f0);
    +                                   assertNotNull(next.f1);
    +                                   assertEquals(next.f0.longValue(), 
Long.parseLong(next.f1));
    +
    +                                   bitSet.set(next.f0.intValue());
    +                           }
    +
    +                           assertEquals(numElements, bitSet.cardinality());
    +                   }
    +
    +                   // make sure all entries are contained via the prober
    +                   {
    +                           ReduceHashTable<Tuple2<Long, 
String>>.HashTableProber<Long> proper =
    +                                   table.getProber(probeComparator, 
pairComparator);
    +
    +                           Tuple2<Long, String> reuse = new Tuple2<>();
    +
    +                           for (long i = 0; i < numElements; i++) {
    +                                   assertNotNull(proper.getMatchFor(i, 
reuse));
    +                                   assertNull(proper.getMatchFor(i + 
numElements, reuse));
    +                           }
    +                   }
    +           }
    +           catch (Exception e) {
    +                   e.printStackTrace();
    +                   fail(e.getMessage());
    +           }
    +   }
    +
    +   /**
    +    * This test validates that records are not lost via 
"insertOrReplace()" as in bug [FLINK-2361]
    +    */
    +   @Test
    +   public void testHashTableGrowthWithInsertOrReplace() {
    +           try {
    +                   final int numElements = 1000000;
    +
    +                   List<MemorySegment> memory = getMemory(1000, 32 * 1024);
    +
    +                   // we create a hash table that thinks the records are 
super large. that makes it choose initially
    +                   // a lot of memory for the partition buffers, and start 
with a smaller hash table. that way
    +                   // we trigger a hash table growth early.
    +                   ReduceHashTable<Tuple2<Long, String>> table = new 
ReduceHashTable<Tuple2<Long, String>>(
    +                           serializer, comparator, memory, null, null, 
false);
    +                   table.open();
    +
    +                   for (long i = 0; i < numElements; i++) {
    +                           table.insertOrReplaceRecord(new Tuple2<Long, 
String>(i, String.valueOf(i)));
    +                   }
    +
    +                   // make sure that all elements are contained via the 
entry iterator
    +                   {
    +                           BitSet bitSet = new BitSet(numElements);
    +                           MutableObjectIterator<Tuple2<Long, String>> 
iter = table.getEntryIterator();
    +                           Tuple2<Long, String> next;
    +                           while ((next = iter.next()) != null) {
    +                                   assertNotNull(next.f0);
    +                                   assertNotNull(next.f1);
    +                                   assertEquals(next.f0.longValue(), 
Long.parseLong(next.f1));
    +
    +                                   bitSet.set(next.f0.intValue());
    +                           }
    +
    +                           assertEquals(numElements, bitSet.cardinality());
    +                   }
    +
    +                   // make sure all entries are contained via the prober
    +                   {
    +                           ReduceHashTable<Tuple2<Long, 
String>>.HashTableProber<Long> proper =
    +                                   table.getProber(probeComparator, 
pairComparator);
    +
    +                           Tuple2<Long, String> reuse = new Tuple2<>();
    +
    +                           for (long i = 0; i < numElements; i++) {
    +                                   assertNotNull(proper.getMatchFor(i, 
reuse));
    +                                   assertNull(proper.getMatchFor(i + 
numElements, reuse));
    +                           }
    +                   }
    +
    +           }
    +           catch (Exception e) {
    +                   e.printStackTrace();
    +                   fail(e.getMessage());
    +           }
    +   }
    +
    +   /**
    +    * This test validates that new inserts (rather than updates) in 
"insertOrReplace()" properly
    +    * react to out of memory conditions.
    +    *
    +    * Btw. this also tests the situation, when records are larger than one 
memory segment.
    +    */
    +   @Test
    +   public void testInsertsWithInsertOrReplace() {
    --- End diff --
    
    This test does not check for correct results in the table.


> Add hash-based Aggregation
> --------------------------
>
>                 Key: FLINK-2237
>                 URL: https://issues.apache.org/jira/browse/FLINK-2237
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Rafiullah Momand
>            Assignee: Gabor Gevay
>            Priority: Minor
>
> Aggregation functions at the moment are implemented in a sort-based way.
> How can we implement hash based Aggregation for Flink?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to