[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376126#comment-15376126
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1517


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15375658#comment-15375658
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/1517
  
CI tests are passing. I've been testing Gelly algorithms with this without 
error. I will merge this ...


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369673#comment-15369673
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/1517
  
> Reading back through the discussion I see that there are many ideas for 
future performance enhancements. If not already suggested I'd like to consider 
skipping staging for fixed length records.

Thanks, I've added this to my notes.

> I'm missing why we can't update in place with smaller records. The 
deserializer is responsible for detecting the end of the record and we wouldn't 
need to change the pointer value when replacing with a smaller record.

A problem would arise in `EntryIterator`: after reading a record, we 
wouldn't know where the next record starts. (As it is now, it always starts 
right after the previous.)

Thanks @greghogan for pushing this forward. I think I have addressed all 
your comments.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369670#comment-15369670
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70182809
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
+   
+   public static void main(String[] args) throws Exception {
+
+   final int numElements = 40_000_000;
+   final int keyRange=  4_000_000;
+
+   // warm up JIT
+   testReducePerformance(new TupleIntIntIterator(1000),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class),
+   CombineHint.SORT, 1, false);
+
+   testReducePerformance(new TupleIntIntIterator(1000),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class),
+   CombineHint.HASH, 1, false);
+
+   // TupleIntIntIterator
+   testReducePerformance(new TupleIntIntIterator(keyRange),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class),
+   CombineHint.SORT, numElements, true);
+
+   testReducePerformance(new TupleIntIntIterator(keyRange),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class),
+   CombineHint.HASH, numElements, true);
+
+   // TupleStringIntIterator
+   testReducePerformance(new TupleStringIntIterator(keyRange),
+   TupleTypeInfo.>getBasicTupleTypeInfo(String.class, Integer.class),
+   CombineHint.SORT, numElements, true);
+
+   testReducePerformance(new TupleStringIntIterator(keyRange),
+   TupleTypeInfo.>getBasicTupleTypeInfo(String.class, Integer.class),
+   CombineHint.HASH, numElements, true);
+   }
+
+   private static  void 
testReducePerformance
+   (B iterator, TypeInformation typeInfo, CombineHint hint, int 
numRecords, boolean print) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   //env.getConfig().enableObjectReuse();
--- End diff --

615f6a642e30edec8fb98c3319d37983c97d971a


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369669#comment-15369669
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70182801
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
 ---
@@ -1480,28 +1480,17 @@ public static int getInitialTableSize(int 
numBuffers, int bufferSize, int numPar
public static byte assignPartition(int bucket, byte numPartitions) {
return (byte) (bucket % numPartitions);
}
-   
+
/**
-* This function hashes an integer value. It is adapted from Bob 
Jenkins' website
-* http://www.burtleburtle.net/bob/hash/integer.html;>http://www.burtleburtle.net/bob/hash/integer.html.
-* The hash function has the full avalanche property, meaning 
that every bit of the value to be hashed
-* affects every bit of the hash value. 
-* 
-* @param code The integer to be hashed.
-* @return The hash code for the integer.
-*/
+* The level parameter is needed so that we can have different hash 
functions when we recursively apply
+* the partitioning, so that the working set eventually fits into 
memory.
+ */
public static int hash(int code, int level) {
final int rotation = level * 11;

code = (code << rotation) | (code >>> -rotation);
--- End diff --

abfd1ff825bf63c5cda11c2b5a556990ca5df3e1


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369668#comment-15369668
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70182796
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/SameTypePairComparator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils;
+
+public class SameTypePairComparator extends TypePairComparator {
--- End diff --

231004e011cda6e9645d6a2d1983e14f55b73c47


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369666#comment-15369666
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70182792
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java ---
@@ -45,10 +46,28 @@ public ReduceNode(ReduceOperatorBase operator) {
// case of a key-less reducer. force a parallelism of 1
setParallelism(1);
}
-   
-   OperatorDescriptorSingle props = this.keys == null ?
-   new AllReduceProperties() :
-   new ReduceProperties(this.keys, 
operator.getCustomPartitioner());
+
+   OperatorDescriptorSingle props;
+
+   if (this.keys == null) {
+   props = new AllReduceProperties();
+   } else {
+   DriverStrategy combinerStrategy;
+   switch(operator.getCombineHint()) {
+   case OPTIMIZER_CHOOSES:
+   combinerStrategy = 
DriverStrategy.SORTED_PARTIAL_REDUCE;
--- End diff --

I included the `OPTIMIZER_CHOOSES` case because `JoinHint` and `CrossHint` 
also have this case. I guess it is there because we might modify what the 
default is later.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369664#comment-15369664
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70182744
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

OK, 53be6e531302f6f3bd130127958599ae76815eb0 reverts the commit that broke 
the compatibility. We are now back with the overloads.
a9f3b9c54918b2c7aa4e0944b0c0bcd13bafcc2e adds the @PublicEvolving hint.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368557#comment-15368557
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70146784
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

I think it is OK to add overloaded `reduce()` methods to `DataSet` and 
`GroupedDataSet`. These methods should be `PublicEvolving`. I would not add 
overloaded methods to more specialized operations as in the first approach to 
add the `CombineHint` to the Java API.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368461#comment-15368461
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70139954
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

@fhueske or @StephanEwen since we cannot break the scala DataSet API by 
creating and returning a `ReduceOperator`, do you agree with Gábor's 
recommendation to overload `DataSet.reduce` with `CombineHint`?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15353454#comment-15353454
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/1517
  
+1 with just a few superficial comments.

Reading back through the discussion I see that there are many ideas for 
future performance enhancements. If not already suggested I'd like to consider 
skipping staging for fixed length records.

I'm missing why we can't update in place with smaller records. The 
deserializer is responsible for detecting the end of the record and we wouldn't 
need to change the pointer value when replacing with a smaller record.

CombineHint.NONE can be implemented in a new PR since this looks to be 
ready as-is.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351760#comment-15351760
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r68649923
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+public class ReducePerformance {
+   
+   public static void main(String[] args) throws Exception {
+
+   final int numElements = 40_000_000;
+   final int keyRange=  4_000_000;
+
+   // warm up JIT
+   testReducePerformance(new TupleIntIntIterator(1000),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class),
+   CombineHint.SORT, 1, false);
+
+   testReducePerformance(new TupleIntIntIterator(1000),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class),
+   CombineHint.HASH, 1, false);
+
+   // TupleIntIntIterator
+   testReducePerformance(new TupleIntIntIterator(keyRange),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class),
+   CombineHint.SORT, numElements, true);
+
+   testReducePerformance(new TupleIntIntIterator(keyRange),
+   TupleTypeInfo.>getBasicTupleTypeInfo(Integer.class, Integer.class),
+   CombineHint.HASH, numElements, true);
+
+   // TupleStringIntIterator
+   testReducePerformance(new TupleStringIntIterator(keyRange),
+   TupleTypeInfo.>getBasicTupleTypeInfo(String.class, Integer.class),
+   CombineHint.SORT, numElements, true);
+
+   testReducePerformance(new TupleStringIntIterator(keyRange),
+   TupleTypeInfo.>getBasicTupleTypeInfo(String.class, Integer.class),
+   CombineHint.HASH, numElements, true);
+   }
+
+   private static  void 
testReducePerformance
+   (B iterator, TypeInformation typeInfo, CombineHint hint, int 
numRecords, boolean print) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   //env.getConfig().enableObjectReuse();
--- End diff --

Should this commented code be removed or enabled?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351713#comment-15351713
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r68646048
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
 ---
@@ -1480,28 +1480,17 @@ public static int getInitialTableSize(int 
numBuffers, int bufferSize, int numPar
public static byte assignPartition(int bucket, byte numPartitions) {
return (byte) (bucket % numPartitions);
}
-   
+
/**
-* This function hashes an integer value. It is adapted from Bob 
Jenkins' website
-* http://www.burtleburtle.net/bob/hash/integer.html;>http://www.burtleburtle.net/bob/hash/integer.html.
-* The hash function has the full avalanche property, meaning 
that every bit of the value to be hashed
-* affects every bit of the hash value. 
-* 
-* @param code The integer to be hashed.
-* @return The hash code for the integer.
-*/
+* The level parameter is needed so that we can have different hash 
functions when we recursively apply
+* the partitioning, so that the working set eventually fits into 
memory.
+ */
public static int hash(int code, int level) {
final int rotation = level * 11;

code = (code << rotation) | (code >>> -rotation);
--- End diff --

Since this was included in the diff ... `Integer.rotateLeft`.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351664#comment-15351664
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r68641370
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java ---
@@ -45,10 +46,28 @@ public ReduceNode(ReduceOperatorBase operator) {
// case of a key-less reducer. force a parallelism of 1
setParallelism(1);
}
-   
-   OperatorDescriptorSingle props = this.keys == null ?
-   new AllReduceProperties() :
-   new ReduceProperties(this.keys, 
operator.getCustomPartitioner());
+
+   OperatorDescriptorSingle props;
+
+   if (this.keys == null) {
+   props = new AllReduceProperties();
+   } else {
+   DriverStrategy combinerStrategy;
+   switch(operator.getCombineHint()) {
+   case OPTIMIZER_CHOOSES:
+   combinerStrategy = 
DriverStrategy.SORTED_PARTIAL_REDUCE;
--- End diff --

Merge the `OPTIMIZER_CHOOSES` and `SORT` cases?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351651#comment-15351651
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r68640552
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/SameTypePairComparator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils;
+
+public class SameTypePairComparator extends TypePairComparator {
--- End diff --

Mark as `@Internal` (as is `TypePairComparator`)?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15338522#comment-15338522
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user aalexandrov commented on the issue:

https://github.com/apache/flink/pull/1517
  
@ggevay :+1: 


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15338503#comment-15338503
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r67615885
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

Unfortunately this doesn't seem to work either: japicmp is complaining [1] 
that the return type of `DataSet.reduce` has changed (to the newly created 
`ReduceOperator` scala class), which breaks binary compatibility [2]. I'm out 
of any better ideas than to just go back to adding overloads that take the 
`CombineHint` as an additional parameter.

[1] http://compalg.inf.elte.hu/~ggevay/japicmp.diff
[2] 
https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.15


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15337835#comment-15337835
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/1517
  
I have rebased to the current master.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313791#comment-15313791
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65667687
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

Sorry, I forgot the rename in the chained driver:
6abd3f3cf49568cc0fecd85d7e7d8a0d7f9ec39f
And I forgot to invert the meaning with the rename in ReduceCombineDriver:
984ba12f44a7ee9b16790c3e172b53969448e1c2


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310523#comment-15310523
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/1517
  
Hi @fhueske,
Thanks for the comments, I think I have addressed all of them.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310475#comment-15310475
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65383363
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ---
@@ -45,7 +45,12 @@ public RandomAccessInputView(ArrayList 
segments, int segmentSize)
{
this(segments, segmentSize, segmentSize);
}
-   
+
+   public RandomAccessInputView(ArrayList segments, int 
segmentSize, boolean dummy)
--- End diff --

I went with the first alternative:
9cb6befe39427bc055c08b244177401db3f5a813


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310421#comment-15310421
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65376284
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310392#comment-15310392
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65371629
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
 ---
@@ -252,4 +262,81 @@ private static String getLongString(int length) {
}
return bld.toString();
}
+
+
--- End diff --

5c27d4717bae1e6fe27e489806d0ebf22316fe85


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310389#comment-15310389
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65371572
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310391#comment-15310391
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65371616
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTestCommon.java
 ---
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
+import org.apache.flink.runtime.operators.testutils.types.IntList;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListSerializer;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairListPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Test;
+
+import org.powermock.reflect.Whitebox;
+
+import static org.junit.Assert.*;
+
+
+public class MemoryHashTableTestCommon {
--- End diff --

Good idea!
5c27d4717bae1e6fe27e489806d0ebf22316fe85


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310276#comment-15310276
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65356958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310275#comment-15310275
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65356908
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310273#comment-15310273
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65356863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310040#comment-15310040
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65331149
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308246#comment-15308246
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65233230
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308233#comment-15308233
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308232#comment-15308232
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231642
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308227#comment-15308227
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

> Can we keep the name of the flag as it is for now? All other drivers use 
a running flag as well. 
I would rather open a separate JIRA to fix the name in all drivers.

12e36ab93b7e7d94d497a6d718eba21ead813d7e

And here is the Jira: https://issues.apache.org/jira/browse/FLINK-3999


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308229#comment-15308229
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231501
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308230#comment-15308230
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231516
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -114,85 +118,133 @@ public void prepare() throws Exception {
 
MemoryManager memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(
-   
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+   
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
this.memory = 
memManager.allocatePages(this.taskContext.getOwningNepheleTask(), 
numMemoryPages);
 
-   // instantiate a fix-length in-place sorter, if possible, 
otherwise the out-of-place sorter
-   if (this.comparator.supportsSerializationWithKeyNormalization() 
&&
-   this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
-   {
-   this.sorter = new 
FixedLengthRecordSorter(this.serializer, this.comparator, memory);
-   } else {
-   this.sorter = new 
NormalizedKeySorter(this.serializer, this.comparator.duplicate(), memory);
-   }
-
ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
 
if (LOG.isDebugEnabled()) {
LOG.debug("ReduceCombineDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
}
+
+   switch (strategy) {
+   case SORTED_PARTIAL_REDUCE:
+   // instantiate a fix-length in-place sorter, if 
possible, otherwise the out-of-place sorter
+   if 
(this.comparator.supportsSerializationWithKeyNormalization() &&
+   this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) {
+   this.sorter = new 
FixedLengthRecordSorter(this.serializer, this.comparator, memory);
--- End diff --

336490ccbcfc3b1a2be93eddd9cb531532a19a9e


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308205#comment-15308205
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65229499
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

Oops, it's `volatile` in the other drivers as well, sorry! So then it can 
be just volatile here as well, right?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308191#comment-15308191
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65228213
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

e7851693dd261efb6eb1fcde12b04fcf384bd0d4


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308193#comment-15308193
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65228287
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

b01fd497fe5e2cf85b71fa8a7b4ec60b5e47d1c5


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308080#comment-15308080
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65219886
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

> Would it be OK if I added optional arguments instead of the overloads 
like this?

Oops, this doesn't work. I'm getting
```
Error:(45, 7) in class GroupedDataSet, multiple overloaded alternatives of 
reduce define default arguments
```

An alternative solution would be to instead of just wrapping in a Scala 
`DataSet`, create a Scala class that wraps `ReduceOperator`, has the same 
`setCombineHint` method, and inherits from the Scala `DataSet`.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308038#comment-15308038
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65216454
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

I have now looked more into this `volatile` stuff, and the problem is that 
further investigation would be needed to ensure that the variable being 
`volatile` doesn't have a bad performance effect (see [1]). So maybe the best 
option for now is to make it non-volatile (as it is in all the other drivers) 
and just hope that it doesn't blow up. Should I open a Jira for doing further 
investigation and then maybe changing all the drivers?

[1] http://brooker.co.za/blog/2012/09/10/volatile.html


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306801#comment-15306801
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65090655
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306787#comment-15306787
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-222520511
  
> This PR does also fix issue FLINK-2246 (add ChainedReduceCombineDriver). 
Would be nice to have that change in a separate commit before we merge.

It is in c722f58baef0803a60b3232ada2e50f091d8b917. (second commit of the PR)


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306279#comment-15306279
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65027607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
--- End diff --

ok


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306278#comment-15306278
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65027557
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

I think it's better to have `volatile` here. This variable will be set from 
a different thread, and `volatile` is not only for atomicity, but also for 
memory consistency (seeing the effect of a write in an other thread). If a 
variable is not volatile then the compiler may assume in certain cases that it 
is only modified and read by one thread (17.4.2-5. in [1]). (Also see [2].)

Note: omitting the volatile probably wouldn't cause any actual bug here, 
because the loop bodies are large so the compiler probably won't inline and 
analyze the entire call tree to look for writes to this flag, but I wouldn't 
risk it. Also, I don't know how common it is in Java that this stuff causes 
actual problems, but it actually happened to me in C++ once that a loop like 
this was effectively turned into `while(true)` by the compiler, because my flag 
was not volatile. It was a nasty debugging session. (Another problematic thing 
that the compiler is allowed to do with non-volatile variables is to cache the 
value of the variable in a register, and not read it from memory if it can turn 
all possible writes to it by the current thread into a write to the register.)

[1] http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.3
[2] 
http://stackoverflow.com/questions/106591/do-you-ever-use-the-volatile-keyword-in-java


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306270#comment-15306270
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65026385
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

OK, I'll name it back to `running`. (Btw. now I remember what was my 
problem with this name: if the operator stops normally (not by cancel), then 
this is still `true`, despite not running anymore. So this is really only for 
cancelling.)


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306251#comment-15306251
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65024796
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

There is a problem with this in the Scala API: the `wrap()` calls hide the 
`ReduceOperator` from the user.

Would it be OK if I added optional arguments instead of the overloads like 
this?
```
def reduce(fun: (T, T) => T, strategy: CombineHint = 
CombineHint.OPTIMIZER_CHOOSES): DataSet[T] = {
```


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301143#comment-15301143
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221739936
  
Hi @ggevay as promised yesterday, I added my comments and suggestions. Let 
me know what you think. 

@greghogan, would you like to take a look as well? Another pair of eyes are 
always a good idea.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301148#comment-15301148
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221740318
  
This PR does also fix issue FLINK-2246 (add ChainedReduceCombineDriver). 
Would be nice to have that change in a separate commit before we merge.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301128#comment-15301128
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301126#comment-15301126
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301122#comment-15301122
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301124#comment-15301124
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671403
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301108#comment-15301108
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670534
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301104#comment-15301104
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670218
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301102#comment-15301102
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670146
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301097#comment-15301097
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301083#comment-15301083
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669221
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
 ---
@@ -252,4 +262,81 @@ private static String getLongString(int length) {
}
return bld.toString();
}
+
+
--- End diff --

many blank lines here


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301082#comment-15301082
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669166
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTestCommon.java
 ---
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
+import org.apache.flink.runtime.operators.testutils.types.IntList;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListSerializer;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairListPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Test;
+
+import org.powermock.reflect.Whitebox;
+
+import static org.junit.Assert.*;
+
+
+public class MemoryHashTableTestCommon {
--- End diff --

Can we make this class `abstract` and rename it to 
`MutableHashTableTestBase`?
Then `CompactingHashTableTest` and `ReduceHashTableTest` can extend this 
class and do not need to implement the shared tests. `MutableHashTableTestBase` 
could have an abstract method `getHashTable`. `CompactingHashTableTest`'s 
implementation would return a `CompactingHashTable` and `ReduceHashTableTest` a 
`ReduceHashTable`.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301073#comment-15301073
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668757
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301061#comment-15301061
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667955
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ---
@@ -45,7 +45,12 @@ public RandomAccessInputView(ArrayList 
segments, int segmentSize)
{
this(segments, segmentSize, segmentSize);
}
-   
+
+   public RandomAccessInputView(ArrayList segments, int 
segmentSize, boolean dummy)
--- End diff --

This constructor is called twice from the constructor of the 
`ReduceHashTable`. Once to initialize the input view of the `RecordArea` and 
once for the `StagingArea`. Both areas will need at least one buffer. Maybe I 
am wrong, but if we give both one initial buffer, we do not need this 
additional constructor with the `dummy` flag. 

As a second alternative to the constructor with the `dummy` flag, we could 
also implement a constructor without the `ArrayList`, create it 
in the constructor, and add a `getSegementList()` method to access the created 
list.

What do you think, @ggevay?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301070#comment-15301070
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668630
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301069#comment-15301069
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668546
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
--- End diff --

Can we use `running` instead of `canceled` for the sake of consistency?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301065#comment-15301065
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668428
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

Can we keep the name of the flag as it is for now? All other drivers use a 
`running` flag as well. 
I would rather open a separate JIRA to fix the name in all drivers.

I don't think that `volatile` is required here. Accesses to primitive 
`boolean` are always atomic.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301046#comment-15301046
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667048
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

The `ReduceOperator.setCombineHint(CombineHint)` method should be annotated 
with `@PublicEvolving`.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301050#comment-15301050
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667132
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

The overloaded methods can be removed if `ReduceOperator` has a 
`setCombineHint()` method


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301042#comment-15301042
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64666960
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

I don't think we should overload all reduce (or reduce-based) methods in 
`UnsortedGrouping` but rather add a method `setCombineHint(CombineHint)` to 
`ReduceOperator`. That way the API doesn't grow too much. This is also more 
modular once we add a hash-based strategy for the final Reduce.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300638#comment-15300638
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221670393
  
And @th0br0 is working on CombineHint.NONE.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300456#comment-15300456
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221644628
  
We talked about this yesterday with @fhueske at the Flink meetup, and the 
conclusion was that we will get it into 1.1. @fhueske has several comments that 
he will write here in a few days, and then I will make the (hopefully) final 
adjustments.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300453#comment-15300453
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221643813
  
What is the status of this PR? Can we get this in and tested for the 1.1 
release?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247762#comment-15247762
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r60232838
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 ---
@@ -54,6 +54,32 @@
 @Internal
 public class ReduceOperatorBase extends 
SingleInputOperator {
 
+   /**
+* An enumeration of hints, optionally usable to tell the system 
exactly how to execute the combiner phase
+* of a reduce.
+* (Note: The final reduce phase (after combining) is currently always 
executed by a sort-based strategy.)
+*/
+   public enum CombineHint {
--- End diff --

+1


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246186#comment-15246186
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r60105060
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 ---
@@ -54,6 +54,32 @@
 @Internal
 public class ReduceOperatorBase extends 
SingleInputOperator {
 
+   /**
+* An enumeration of hints, optionally usable to tell the system 
exactly how to execute the combiner phase
+* of a reduce.
+* (Note: The final reduce phase (after combining) is currently always 
executed by a sort-based strategy.)
+*/
+   public enum CombineHint {
--- End diff --

Good idea! I will do it.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246167#comment-15246167
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r60104097
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 ---
@@ -54,6 +54,32 @@
 @Internal
 public class ReduceOperatorBase extends 
SingleInputOperator {
 
+   /**
+* An enumeration of hints, optionally usable to tell the system 
exactly how to execute the combiner phase
+* of a reduce.
+* (Note: The final reduce phase (after combining) is currently always 
executed by a sort-based strategy.)
+*/
+   public enum CombineHint {
--- End diff --

Should we consider an additional flag {{CombineHint.NONE}} for cases where 
the expected number of duplicate keys is relatively small (i.e. FLINK-3279)?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234936#comment-15234936
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-208307114
  
Thanks @ggevay and @aalexandrov! I left a few comments in GDoc. I am not 
100% sure about the results for changing record sizes and the overhead of 
compaction. Otherwise the results look good. 25% overall job improvement sounds 
very promising. :-)
I'll do another pass over the code in the next days. Thanks, Fabian


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234309#comment-15234309
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-208083609
  
We've summarized the use-case around the hash aggregation experiments in [a 
blog post on the Peel 
webpage](http://peel-framework.org/2016/04/07/hash-aggregations-in-flink.html).

If you follow the instructions from the **Repeatability** section you 
should be able to reproduce the results on other environments without too much 
hastle.

I hope that this will be the first of many Flink-related public Peel 
bundles.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216998#comment-15216998
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-203143579
  
Thanks for doing these experiments! The results are quite convincing. I'm 
currently on vacation and will be back in about a week. 


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215928#comment-15215928
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-202868272
  
Overall it seems that the hash-based combiner works better than the 
sort-based one for (a) uniform, or normal key distribution, and (b) 
fixed-length records.

For skewed key distribution (like Zipf) the two strategies are practically 
equal, and for variable-length record the extra effort in compacting the record 
offsets the advanges of the hash-based aggregation approach. 


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215916#comment-15215916
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-202864630
  
@fhueske We have used the Easter break to conduct the experiments. A 
preliminary writeup is in the Google Doc. @ggevay will provide the results 
analysis later today. Cheers!  


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15201623#comment-15201623
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-198407391
  
Hi @fhueske,
I'm planning to do the benchmarks next week Monday-Wednesday.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15201626#comment-15201626
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-198408023
  
Very good. Let's try to merge this soon. Looking forward to the numbers :-)



> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15201616#comment-15201616
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-198406921
  
Hi @ggevay, how are things going? 
I would like to add this feature soon and will do another review next week.
Have you started with the benchmarks or time to do them in the next 1 to 2 
weeks?



> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-02-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15171778#comment-15171778
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-190171821
  
I'll do the Peel thing after March 15, because I have to work on my PhD 
thesis proposal until then.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158854#comment-15158854
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-187697588
  
Done


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)