[jira] [Created] (FLINK-3976) Alias fields of physical plan node to avoid name collision with Calcite protected fields

2016-05-25 Thread Yijie Shen (JIRA)
Yijie Shen created FLINK-3976:
-

 Summary: Alias fields of physical plan node to avoid name 
collision with Calcite protected fields
 Key: FLINK-3976
 URL: https://issues.apache.org/jira/browse/FLINK-3976
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Reporter: Yijie Shen
Priority: Minor


The field name collisions in physical plan node make IDE hard to get a field's 
type and unable to give accurate hint while developing, I suggest we rename 
them :)

The collisions are: {{rowType}}, {{input}}, {{table}}, {{left}}, {{right}}, 
{{tuples}},  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3976) Alias fields of physical plan node to avoid name collision with Calcite protected fields

2016-05-25 Thread Yijie Shen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yijie Shen updated FLINK-3976:
--
Description: 
The field name collisions in physical plan node make IDE hard to get a field's 
type and unable to give accurate hint while developing, I suggest we rename 
them :)

The collisions are: {{rowType}}, {{input}}, {{table}}, {{left}}, {{right}}, 
{{tuples}}. 

  was:
The field name collisions in physical plan node make IDE hard to get a field's 
type and unable to give accurate hint while developing, I suggest we rename 
them :)

The collisions are: {{rowType}}, {{input}}, {{table}}, {{left}}, {{right}}, 
{{tuples}},  


> Alias fields of physical plan node to avoid name collision with Calcite 
> protected fields
> 
>
> Key: FLINK-3976
> URL: https://issues.apache.org/jira/browse/FLINK-3976
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Yijie Shen
>Priority: Minor
>
> The field name collisions in physical plan node make IDE hard to get a 
> field's type and unable to give accurate hint while developing, I suggest we 
> rename them :)
> The collisions are: {{rowType}}, {{input}}, {{table}}, {{left}}, {{right}}, 
> {{tuples}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301332#comment-15301332
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221762766
  
Hi @fhueske , I've updated the `expectedType` fix and reverted changes in 
`DataSetUnion` and `DataSetUnionRule` to make the implementation clear and 
simple, what do you think?


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

2016-05-25 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/2025#issuecomment-221762766
  
Hi @fhueske , I've updated the `expectedType` fix and reverted changes in 
`DataSetUnion` and `DataSetUnionRule` to make the implementation clear and 
simple, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release

2016-05-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-2765:
--
Description: 
Currently 0.98.11 is used:
{code}
0.98.11-hadoop2
{code}
Stable release for hadoop-2 is 1.1.x line
We should upgrade to 1.2

  was:
Currently 0.98.11 is used:
{code}
0.98.11-hadoop2
{code}

Stable release for hadoop-2 is 1.1.x line
We should upgrade to 1.2


> Upgrade hbase version for hadoop-2 to 1.2 release
> -
>
> Key: FLINK-2765
> URL: https://issues.apache.org/jira/browse/FLINK-2765
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently 0.98.11 is used:
> {code}
> 0.98.11-hadoop2
> {code}
> Stable release for hadoop-2 is 1.1.x line
> We should upgrade to 1.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301143#comment-15301143
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221739936
  
Hi @ggevay as promised yesterday, I added my comments and suggestions. Let 
me know what you think. 

@greghogan, would you like to take a look as well? Another pair of eyes are 
always a good idea.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667132
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

The overloaded methods can be removed if `ReduceOperator` has a 
`setCombineHint()` method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221740318
  
This PR does also fix issue FLINK-2246 (add ChainedReduceCombineDriver). 
Would be nice to have that change in a separate commit before we merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301148#comment-15301148
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221740318
  
This PR does also fix issue FLINK-2246 (add ChainedReduceCombineDriver). 
Would be nice to have that change in a separate commit before we merge.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669221
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
 ---
@@ -252,4 +262,81 @@ private static String getLongString(int length) {
}
return bld.toString();
}
+
+
--- End diff --

many blank lines here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670146
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *
   

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301128#comment-15301128
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *
   

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669166
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTestCommon.java
 ---
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
+import org.apache.flink.runtime.operators.testutils.types.IntList;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListSerializer;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairListPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Test;
+
+import org.powermock.reflect.Whitebox;
+
+import static org.junit.Assert.*;
+
+
+public class MemoryHashTableTestCommon {
--- End diff --

Can we make this class `abstract` and rename it to 
`MutableHashTableTestBase`?
Then `CompactingHashTableTest` and `ReduceHashTableTest` can extend this 
class and do not need to implement the shared tests. `MutableHashTableTestBase` 
could have an abstract method `getHashTable`. `CompactingHashTableTest`'s 
implementation would return a `CompactingHashTable` and `ReduceHashTableTest` a 
`ReduceHashTable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221739936
  
Hi @ggevay as promised yesterday, I added my comments and suggestions. Let 
me know what you think. 

@greghogan, would you like to take a look as well? Another pair of eyes are 
always a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301126#comment-15301126
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671403
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *
   

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *
   

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *
   

[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

2016-05-25 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/flink/pull/2031#issuecomment-221737718
  
Hey @zentol - Thanks for your feedback. 
-  I've renamed the sink as you had recommended.  
- Currently the driver from rethinkdb supports java 8 only.  There is one 
supporting java 6 but it no longer being maintained.
-  The current travis failures for jdk 7 are because of the version issue 
as you had mentioned and those for java 8 are due to failures in the yarn-tests 
module. 
- I've not done any performance tests.  If you have any suggestions for 
throughput numbers that might be valuable, or if there is any other sink that I 
can use for setting up baseline test, please let me know.

If you have any other advice/recommendation, please let me know.   

Thanks again


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670534
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *
   

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *
   

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670218
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *
   

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301122#comment-15301122
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301124#comment-15301124
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64671403
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668757
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager memManager = 
parent.getEnvironment().getMemoryManager();
+   final int numMemoryPages = 
memManager.computeNumberOfPages(config.getRelativeMemoryDriver());
+   memory = memManager.allocatePages(parent, numMemoryPages);

[jira] [Commented] (FLINK-3967) Provide RethinkDB Sink for Flink

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301112#comment-15301112
 ] 

ASF GitHub Bot commented on FLINK-3967:
---

Github user mans2singh commented on the pull request:

https://github.com/apache/flink/pull/2031#issuecomment-221737718
  
Hey @zentol - Thanks for your feedback. 
-  I've renamed the sink as you had recommended.  
- Currently the driver from rethinkdb supports java 8 only.  There is one 
supporting java 6 but it no longer being maintained.
-  The current travis failures for jdk 7 are because of the version issue 
as you had mentioned and those for java 8 are due to failures in the yarn-tests 
module. 
- I've not done any performance tests.  If you have any suggestions for 
throughput numbers that might be valuable, or if there is any other sink that I 
can use for setting up baseline test, please let me know.

If you have any other advice/recommendation, please let me know.   

Thanks again


> Provide RethinkDB Sink for Flink
> 
>
> Key: FLINK-3967
> URL: https://issues.apache.org/jira/browse/FLINK-3967
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 1.0.3
> Environment: All
>Reporter: Mans Singh
>Assignee: Mans Singh
>Priority: Minor
>  Labels: features
> Fix For: 1.1.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Provide Sink to stream data from flink to rethink db.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301108#comment-15301108
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670534
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668798
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -114,85 +118,133 @@ public void prepare() throws Exception {
 
MemoryManager memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(
-   
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+   
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
this.memory = 
memManager.allocatePages(this.taskContext.getOwningNepheleTask(), 
numMemoryPages);
 
-   // instantiate a fix-length in-place sorter, if possible, 
otherwise the out-of-place sorter
-   if (this.comparator.supportsSerializationWithKeyNormalization() 
&&
-   this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
-   {
-   this.sorter = new 
FixedLengthRecordSorter(this.serializer, this.comparator, memory);
-   } else {
-   this.sorter = new 
NormalizedKeySorter(this.serializer, this.comparator.duplicate(), memory);
-   }
-
ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
 
if (LOG.isDebugEnabled()) {
LOG.debug("ReduceCombineDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
}
+
+   switch (strategy) {
+   case SORTED_PARTIAL_REDUCE:
+   // instantiate a fix-length in-place sorter, if 
possible, otherwise the out-of-place sorter
+   if 
(this.comparator.supportsSerializationWithKeyNormalization() &&
+   this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) {
+   this.sorter = new 
FixedLengthRecordSorter(this.serializer, this.comparator, memory);
--- End diff --

Use a duplicated comparator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667955
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ---
@@ -45,7 +45,12 @@ public RandomAccessInputView(ArrayList 
segments, int segmentSize)
{
this(segments, segmentSize, segmentSize);
}
-   
+
+   public RandomAccessInputView(ArrayList segments, int 
segmentSize, boolean dummy)
--- End diff --

This constructor is called twice from the constructor of the 
`ReduceHashTable`. Once to initialize the input view of the `RecordArea` and 
once for the `StagingArea`. Both areas will need at least one buffer. Maybe I 
am wrong, but if we give both one initial buffer, we do not need this 
additional constructor with the `dummy` flag. 

As a second alternative to the constructor with the `dummy` flag, we could 
also implement a constructor without the `ArrayList`, create it 
in the constructor, and add a `getSegementList()` method to access the created 
list.

What do you think, @ggevay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301104#comment-15301104
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670218
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668428
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

Can we keep the name of the flag as it is for now? All other drivers use a 
`running` flag as well. 
I would rather open a separate JIRA to fix the name in all drivers.

I don't think that `volatile` is required here. Accesses to primitive 
`boolean` are always atomic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668630
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager memManager = 
parent.getEnvironment().getMemoryManager();
+   final int numMemoryPages = 
memManager.computeNumberOfPages(config.getRelativeMemoryDriver());
+   memory = memManager.allocatePages(parent, numMemoryPages);

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301102#comment-15301102
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64670146
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667048
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

The `ReduceOperator.setCombineHint(CombineHint)` method should be annotated 
with `@PublicEvolving`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668546
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
--- End diff --

Can we use `running` instead of `canceled` for the sake of consistency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64666960
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

I don't think we should overload all reduce (or reduce-based) methods in 
`UnsortedGrouping` but rather add a method `setCombineHint(CombineHint)` to 
`ReduceOperator`. That way the API doesn't grow too much. This is also more 
modular once we add a hash-based strategy for the final Reduce.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301097#comment-15301097
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301083#comment-15301083
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669221
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
 ---
@@ -252,4 +262,81 @@ private static String getLongString(int length) {
}
return bld.toString();
}
+
+
--- End diff --

many blank lines here


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301082#comment-15301082
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64669166
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTestCommon.java
 ---
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
+import org.apache.flink.runtime.operators.testutils.types.IntList;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntListSerializer;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairListPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import 
org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Test;
+
+import org.powermock.reflect.Whitebox;
+
+import static org.junit.Assert.*;
+
+
+public class MemoryHashTableTestCommon {
--- End diff --

Can we make this class `abstract` and rename it to 
`MutableHashTableTestBase`?
Then `CompactingHashTableTest` and `ReduceHashTableTest` can extend this 
class and do not need to implement the shared tests. `MutableHashTableTestBase` 
could have an abstract method `getHashTable`. `CompactingHashTableTest`'s 
implementation would return a `CompactingHashTable` and `ReduceHashTableTest` a 
`ReduceHashTable`.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301073#comment-15301073
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668757
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301061#comment-15301061
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667955
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 ---
@@ -45,7 +45,12 @@ public RandomAccessInputView(ArrayList 
segments, int segmentSize)
{
this(segments, segmentSize, segmentSize);
}
-   
+
+   public RandomAccessInputView(ArrayList segments, int 
segmentSize, boolean dummy)
--- End diff --

This constructor is called twice from the constructor of the 
`ReduceHashTable`. Once to initialize the input view of the `RecordArea` and 
once for the `StagingArea`. Both areas will need at least one buffer. Maybe I 
am wrong, but if we give both one initial buffer, we do not need this 
additional constructor with the `dummy` flag. 

As a second alternative to the constructor with the `dummy` flag, we could 
also implement a constructor without the `ArrayList`, create it 
in the constructor, and add a `getSegementList()` method to access the created 
list.

What do you think, @ggevay?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301070#comment-15301070
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668630
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301069#comment-15301069
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668546
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
--- End diff --

Can we use `running` instead of `canceled` for the sake of consistency?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301065#comment-15301065
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64668428
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

Can we keep the name of the flag as it is for now? All other drivers use a 
`running` flag as well. 
I would rather open a separate JIRA to fix the name in all drivers.

I don't think that `volatile` is required here. Accesses to primitive 
`boolean` are always atomic.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301046#comment-15301046
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667048
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

The `ReduceOperator.setCombineHint(CombineHint)` method should be annotated 
with `@PublicEvolving`.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301050#comment-15301050
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64667132
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

The overloaded methods can be removed if `ReduceOperator` has a 
`setCombineHint()` method


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301042#comment-15301042
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r64666960
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

I don't think we should overload all reduce (or reduce-based) methods in 
`UnsortedGrouping` but rather add a method `setCombineHint(CombineHint)` to 
`ReduceOperator`. That way the API doesn't grow too much. This is also more 
modular once we add a hash-based strategy for the final Reduce.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-25 Thread B Wyatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

B Wyatt updated FLINK-3974:
---
Attachment: bwyatt-FLINK3974.1.patch

I was able to fix this for my topology and the test case by making a very small 
change to {{ChainingOutput.collect()}} in OperatorChain.java.

When object reuse is disabled {{CopyingChainingOutput.collect()}} is used to 
make a full copy of the {{StreamRecord}}:
{code:java}
StreamRecord copy = record.copy(serializer.copy(record.getValue()));
{code}

If object reuse is enabled, {{ChainingOutput.collect()}} is used instead.  
As the internal code will mutate {{StreamRecord<>}} downstream, I thought the 
best "good-faith" effort to support object reuse was to copy the 
{{StreamRecord<>}} without copying the value sending this lighter copy 
downstream instead of the original {{StreamRecord<>}}:
{code:java}
StreamRecord shallowCopy = record.copy(record.getValue());
{code}



See attached bwyatt-FLINK3974.1.patch

> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3970) How to deal with "resouce isolation" problem

2016-05-25 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300746#comment-15300746
 ] 

Stephan Ewen commented on FLINK-3970:
-

I think the best way to completely isolate jobs against each other (CPU, 
network, etc) is to run them in different processes.
The best way to do that is to start a flink cluster per job.

That is quite simple with YARN: Submitting the job in a per-job-cluster mode 
brings up the Flink cluster on YARN, runs the job, and shuts the cluster down.
You can do something similar in other containerized environments.

Hope that solution works for you.

> How to deal with "resouce isolation" problem
> 
>
> Key: FLINK-3970
> URL: https://issues.apache.org/jira/browse/FLINK-3970
> Project: Flink
>  Issue Type: Wish
>Reporter: ZhengBowen
>
> For example, 'big query' and 'small query' are executed at the same time, you 
> need isolate 'big query' and 'small query' to prevent 'big query' exhaust 
> resouce(including i/o,mem,network) to make the 'small query' can complete 
> quickly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3806] [gelly] Revert use of DataSet.cou...

2016-05-25 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2036

[FLINK-3806] [gelly] Revert use of DataSet.count()

This leaves the Graph API unchanged but GatherSumApplyIteration and 
ScatterGatherIteration now use broadcast variables to share the 
numberOfVertices count. The PageRanks have been updated to use this feature.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3806_revert_use_of_dataset_count

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2036.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2036


commit c56cbf997ceaec7e54f5d0847e960337ca1d84ba
Author: Greg Hogan 
Date:   2016-05-25T15:06:01Z

[FLINK-3806] [gelly] Revert use of DataSet.count()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3975) docs build script isn't serving the preview on the correct base url

2016-05-25 Thread Dyana Rose (JIRA)
Dyana Rose created FLINK-3975:
-

 Summary: docs build script isn't serving the preview on the 
correct base url
 Key: FLINK-3975
 URL: https://issues.apache.org/jira/browse/FLINK-3975
 Project: Flink
  Issue Type: Bug
  Components: Documentation
 Environment: centos 7.2, jekyll 3.1.6, ruby 2.3.1
Reporter: Dyana Rose
Priority: Trivial


when running the documentation build script as:
{{./build_docs.sh -p}} the docs are built using the correctly overridden 
baseurl, but they are then served from the wrong url making it a wee bit more 
difficult than intended to review changes locally.

The following is the output from running the script:
{quote}
Configuration file: _config.yml
Configuration file: _local_preview_conf.yml
Source: /vagrant/flink/docs
   Destination: /vagrant/flink/docs/target
 Incremental build: disabled. Enable with --incremental
  Generating...
/home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57:
 warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode 040777
done in 16.736 seconds.
 Auto-regeneration: enabled for '/vagrant/flink/docs'
Configuration file: /vagrant/flink/docs/_config.yml
Server address: 
http://0.0.0.0:4000//ci.apache.org/projects/flink/flink-docs-master/
  Server running... press ctrl-c to stop.
{quote}

As you see it looks to be using both config files to build, but only the 
default config file to serve.

This can be fixed by just removing the {{_local_preview_conf.yml}} file and 
instead in specify the baseurl as an option to the serve command, so it becomes 
{{serve --config _config.yml --baseurl= --watch}}. Giving an output of:
{quote}
Configuration file: _config.yml
Source: /vagrant/flink/docs
   Destination: /vagrant/flink/docs/target
 Incremental build: disabled. Enable with --incremental
  Generating...
/home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57:
 warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode 040777
done in 15.928 seconds.
 Auto-regeneration: enabled for '/vagrant/flink/docs'
Configuration file: /vagrant/flink/docs/_config.yml
Server address: http://0.0.0.0:4000/
  Server running... press ctrl-c to stop.
{quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3

2016-05-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3801:
--
Description: 
Currently yoda-time 2.5 is used which was very old.


We should upgrade to 2.9.3

  was:
Currently yoda-time 2.5 is used which was very old.

We should upgrade to 2.9.3


> Upgrade Joda-Time library to 2.9.3
> --
>
> Key: FLINK-3801
> URL: https://issues.apache.org/jira/browse/FLINK-3801
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently yoda-time 2.5 is used which was very old.
> We should upgrade to 2.9.3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300691#comment-15300691
 ] 

ASF GitHub Bot commented on FLINK-3806:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2036

[FLINK-3806] [gelly] Revert use of DataSet.count()

This leaves the Graph API unchanged but GatherSumApplyIteration and 
ScatterGatherIteration now use broadcast variables to share the 
numberOfVertices count. The PageRanks have been updated to use this feature.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3806_revert_use_of_dataset_count

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2036.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2036


commit c56cbf997ceaec7e54f5d0847e960337ca1d84ba
Author: Greg Hogan 
Date:   2016-05-25T15:06:01Z

[FLINK-3806] [gelly] Revert use of DataSet.count()




> Revert use of DataSet.count() in Gelly
> --
>
> Key: FLINK-3806
> URL: https://issues.apache.org/jira/browse/FLINK-3806
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Critical
> Fix For: 1.1.0
>
>
> FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The 
> former returns a {{DataSet}} while the latter executes a job to return a Java 
> value.
> {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and 
> {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and 
> {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the 
> user does not pass the number of vertices as a parameter.
> As noted in FLINK-1632, this does make the code simpler but if my 
> understanding is correct will materialize the Graph twice. The Graph will 
> need to be reread from input, regenerated, or recomputed by preceding 
> algorithms.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221670393
  
And @th0br0 is working on CombineHint.NONE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300638#comment-15300638
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221670393
  
And @th0br0 is working on CombineHint.NONE.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-25 Thread B Wyatt (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300642#comment-15300642
 ] 

B Wyatt commented on FLINK-3974:


After attempting to produce a minimal repro, I realized that there needs to be 
an operation between the source and the parallel maps:

{code:java}
DataStream input = ...

input = input
.map(MapFunction...);

input
.map(MapFunction...)
.addSink(...);

input
.map(MapFunction...)
​.addSink(...);
{code}

see attached repro case


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-25 Thread B Wyatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

B Wyatt updated FLINK-3974:
---
Comment: was deleted

(was: Simple repro program.)

> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-25 Thread B Wyatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

B Wyatt updated FLINK-3974:
---
Attachment: ReproFLINK3974.java

Simple repro program.

> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300456#comment-15300456
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221644628
  
We talked about this yesterday with @fhueske at the Flink meetup, and the 
conclusion was that we will get it into 1.1. @fhueske has several comments that 
he will write here in a few days, and then I will make the (hopefully) final 
adjustments.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221644628
  
We talked about this yesterday with @fhueske at the Flink meetup, and the 
conclusion was that we will get it into 1.1. @fhueske has several comments that 
he will write here in a few days, and then I will make the (hopefully) final 
adjustments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300453#comment-15300453
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221643813
  
What is the status of this PR? Can we get this in and tested for the 1.1 
release?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-05-25 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1517#issuecomment-221643813
  
What is the status of this PR? Can we get this in and tested for the 1.1 
release?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-25 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri resolved FLINK-2044.
--
   Resolution: Implemented
Fix Version/s: 1.1.0

> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
> Fix For: 1.1.0
>
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300415#comment-15300415
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1956


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
> Fix For: 1.1.0
>
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1956


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300409#comment-15300409
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221637781
  
Thanks @gallenvara! All tests pass now. I will merge :)


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-25 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221637781
  
Thanks @gallenvara! All tests pass now. I will merge :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-25 Thread B Wyatt (JIRA)
B Wyatt created FLINK-3974:
--

 Summary: enableObjectReuse fails when an operator chains to 
multiple downstream operators
 Key: FLINK-3974
 URL: https://issues.apache.org/jira/browse/FLINK-3974
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.0.3
Reporter: B Wyatt


Given a topology that looks like this:

{code:java}
DataStream input = ...
input
.map(MapFunction...)
.addSink(...);

input
.map(MapFunction...)
​.addSink(...);
{code}

enableObjectReuse() will cause an exception in the form of 
{{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.

It looks like the input operator calls {{Output.collect}} 
which attempts to loop over the downstream operators and process them.

However, the first map operation will call {{StreamRecord<>.replace}} which 
mutates the value stored in the StreamRecord<>.  

As a result, when the {{Output.collect}} call passes the 
{{StreamRecord}} to the second map operation it is actually a 
{{StreamRecord}} and behaves as if the two map operations were serial 
instead of parallel.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3973) Table API documentation is "hidden" in Programming Guide menu list

2016-05-25 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3973:


 Summary: Table API documentation is "hidden" in Programming Guide 
menu list
 Key: FLINK-3973
 URL: https://issues.apache.org/jira/browse/FLINK-3973
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Fabian Hueske
 Fix For: 1.1.0


The Table API / SQL documentation is hard to find in the drop down list of the 
"Programming Guide" menu entry.

We should either move it into the "Libraries" menu or move it up in the 
"Programming Guide" entry and bold like the other top entries.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3714) Add Support for "Allowed Lateness"

2016-05-25 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-3714:
-

Assignee: Kostas Kloudas

> Add Support for "Allowed Lateness"
> --
>
> Key: FLINK-3714
> URL: https://issues.apache.org/jira/browse/FLINK-3714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>
> As mentioned in 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#
>  we should add support for an allowed lateness setting.
> This includes several things:
>  - API for setting allowed lateness
>  - Dropping of late elements 
>  - Garbage collection of event-time windows 
> Lateness only makes sense for event-time windows. So we also have to figure 
> out what the API for this should look like and especially what should happen 
> with the "stream-time characteristic" switch. For example in this:
> {code}
> env.setStreamTimeCharacteristic(ProcessingTime)
> ...
> DataStream in = ...
> result = in
>   .keyBy()
>   .timeWindow()
>   .allowedLateness()
>   .apply()
> {code}
> I think the setting can be silently ignored when doing processing-time 
> windowing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-1673) Colocate Flink Kafka consumer

2016-05-25 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-1673.
---
Resolution: Won't Fix

> Colocate Flink Kafka consumer
> -
>
> Key: FLINK-1673
> URL: https://issues.apache.org/jira/browse/FLINK-1673
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Márton Balassi
>Assignee: Tamás Krutki
>
> Kafka exposes the location of the replicas. To make the Flink Kafka Consumers 
> more effective we could do a best effort colocation for the sources with the 
> Kafka brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300238#comment-15300238
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221616393
  
@vasia In the travis failure reports, the failures are relevant with 
flink-yarn-tests module. I have merged the latest code from master and rebase 
all my commit in this PR. And the `HITSAlgorithmITCase` runs successfully in 
local.


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1673) Colocate Flink Kafka consumer

2016-05-25 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300236#comment-15300236
 ] 

Robert Metzger commented on FLINK-1673:
---

I can not remember any user asking for this feature. I think we can close it as 
won't fix for now.

> Colocate Flink Kafka consumer
> -
>
> Key: FLINK-1673
> URL: https://issues.apache.org/jira/browse/FLINK-1673
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Márton Balassi
>Assignee: Tamás Krutki
>
> Kafka exposes the location of the replicas. To make the Flink Kafka Consumers 
> more effective we could do a best effort colocation for the sources with the 
> Kafka brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-25 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221616393
  
@vasia In the travis failure reports, the failures are relevant with 
flink-yarn-tests module. I have merged the latest code from master and rebase 
all my commit in this PR. And the `HITSAlgorithmITCase` runs successfully in 
local.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3945) Degree annotation for directed graphs

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300223#comment-15300223
 ] 

ASF GitHub Bot commented on FLINK-3945:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2021#discussion_r64594890
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param  ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class EdgeDegreesPair
+implements GraphAlgorithm setParallelism(int parallelism) {
+   this.parallelism = parallelism;
+
+   return this;
+   }
+
+   @Override
+   public DataSet>> run(Graph input)
+   throws Exception {
+   // s, t, d(s)
+   DataSet>> edgeSourceDegrees = input
+   .run(new EdgeSourceDegrees()
+   .setParallelism(parallelism));
+
+   // t, d(t)
+   DataSet> vertexDegrees = input
+   .run(new VertexDegrees()
+   .setParallelism(parallelism));
+
+   // s, t, (d(s), d(t))
+   return edgeSourceDegrees
+   .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
--- End diff --

I'm amenable to adding this configuration if we can show noticeably 
improved performance. Choosing a bad `JoinHint` will result in significantly 
degraded performance for most graphs in addition to adding to the user's 
cognitive load.


> Degree annotation for directed graphs
> -
>
> Key: FLINK-3945
> URL: https://issues.apache.org/jira/browse/FLINK-3945
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> There is a third degree count for vertices in directed graphs which is the 
> distinct count of out- and in-neighbors. This also adds edge annotation of 
> the vertex degrees for directed graphs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3945] [gelly] Degree annotation for dir...

2016-05-25 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2021#discussion_r64594890
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param  ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class EdgeDegreesPair
+implements GraphAlgorithm setParallelism(int parallelism) {
+   this.parallelism = parallelism;
+
+   return this;
+   }
+
+   @Override
+   public DataSet>> run(Graph input)
+   throws Exception {
+   // s, t, d(s)
+   DataSet>> edgeSourceDegrees = input
+   .run(new EdgeSourceDegrees()
+   .setParallelism(parallelism));
+
+   // t, d(t)
+   DataSet> vertexDegrees = input
+   .run(new VertexDegrees()
+   .setParallelism(parallelism));
+
+   // s, t, (d(s), d(t))
+   return edgeSourceDegrees
+   .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
--- End diff --

I'm amenable to adding this configuration if we can show noticeably 
improved performance. Choosing a bad `JoinHint` will result in significantly 
degraded performance for most graphs in addition to adding to the user's 
cognitive load.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3972) Subclasses of ResourceID may not to be serializable

2016-05-25 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-3972:
-

 Summary: Subclasses of ResourceID may not to be serializable
 Key: FLINK-3972
 URL: https://issues.apache.org/jira/browse/FLINK-3972
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager
Affects Versions: 1.1.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Priority: Minor
 Fix For: 1.1.0


WorkerTypes are currently subclasses of ResourceID. ResourceID has to be 
Serializable but its subclasses don't. This may lead to problems when these 
subclasses are used as ResourceIDs, i.e. serialization may fail with 
NotSerializableExceptions. Currently, subclasses are never send over the wire 
but they might be in the future.

Instead of relying on subclasses of ResourceID for the WorkerTypes, we can let 
them implement an interface to retrieve the ResourceID of a WorkerType.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3806) Revert use of DataSet.count() in Gelly

2016-05-25 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3806:
--
Fix Version/s: 1.1.0

> Revert use of DataSet.count() in Gelly
> --
>
> Key: FLINK-3806
> URL: https://issues.apache.org/jira/browse/FLINK-3806
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Critical
> Fix For: 1.1.0
>
>
> FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The 
> former returns a {{DataSet}} while the latter executes a job to return a Java 
> value.
> {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and 
> {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and 
> {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the 
> user does not pass the number of vertices as a parameter.
> As noted in FLINK-1632, this does make the code simpler but if my 
> understanding is correct will materialize the Graph twice. The Graph will 
> need to be reread from input, regenerated, or recomputed by preceding 
> algorithms.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-25 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221606502
  
@vasia wait for minutes and i will take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3945] [gelly] Degree annotation for dir...

2016-05-25 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2021#discussion_r64589442
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param  ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class EdgeDegreesPair
+implements GraphAlgorithm setParallelism(int parallelism) {
+   this.parallelism = parallelism;
+
+   return this;
+   }
+
+   @Override
+   public DataSet>> run(Graph input)
+   throws Exception {
+   // s, t, d(s)
+   DataSet>> edgeSourceDegrees = input
+   .run(new EdgeSourceDegrees()
+   .setParallelism(parallelism));
+
+   // t, d(t)
+   DataSet> vertexDegrees = input
+   .run(new VertexDegrees()
+   .setParallelism(parallelism));
+
+   // s, t, (d(s), d(t))
+   return edgeSourceDegrees
+   .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
--- End diff --

I agree that we should automatically set the parameters when possible. So, 
I would keep this as the default setting. However, I think we should allow 
users that _do_ have knowledge about the input graph to either let the 
optimizer decide or override the default setting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3945) Degree annotation for directed graphs

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300183#comment-15300183
 ] 

ASF GitHub Bot commented on FLINK-3945:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/2021#discussion_r64589442
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param  ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class EdgeDegreesPair
+implements GraphAlgorithm setParallelism(int parallelism) {
+   this.parallelism = parallelism;
+
+   return this;
+   }
+
+   @Override
+   public DataSet>> run(Graph input)
+   throws Exception {
+   // s, t, d(s)
+   DataSet>> edgeSourceDegrees = input
+   .run(new EdgeSourceDegrees()
+   .setParallelism(parallelism));
+
+   // t, d(t)
+   DataSet> vertexDegrees = input
+   .run(new VertexDegrees()
+   .setParallelism(parallelism));
+
+   // s, t, (d(s), d(t))
+   return edgeSourceDegrees
+   .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
--- End diff --

I agree that we should automatically set the parameters when possible. So, 
I would keep this as the default setting. However, I think we should allow 
users that _do_ have knowledge about the input graph to either let the 
optimizer decide or override the default setting.


> Degree annotation for directed graphs
> -
>
> Key: FLINK-3945
> URL: https://issues.apache.org/jira/browse/FLINK-3945
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> There is a third degree count for vertices in directed graphs which is the 
> distinct count of out- and in-neighbors. This also adds edge annotation of 
> the vertex degrees for directed graphs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-25 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221603967
  
The failures were in the `HITSAlgorithmITCase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300178#comment-15300178
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221603967
  
The failures were in the `HITSAlgorithmITCase`.


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300177#comment-15300177
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221603165
  
@vasia in which module were the failures? master has been quite unstable 
recently now that tests are properly failing.


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3723) Aggregate Functions and scalar expressions shouldn't be mixed in select

2016-05-25 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300157#comment-15300157
 ] 

Fabian Hueske commented on FLINK-3723:
--

We should resolve this issue (either way) before the Flink 1.1.0. 
I would like to avoid breaking the API after the release.

> Aggregate Functions and scalar expressions shouldn't be mixed in select
> ---
>
> Key: FLINK-3723
> URL: https://issues.apache.org/jira/browse/FLINK-3723
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.1
>Reporter: Yijie Shen
>Priority: Critical
>
> When we type {code}select deptno, name, max(age) from dept group by 
> deptno;{code} in calcite or Oracle, it will complain {code}Expression 'NAME' 
> is not being grouped{code} or {code}Column 'dept.name' is invalid in the 
> select list because it is not contained in either an aggregate function or 
> the GROUP BY clause.{code} because of the nondeterministic result.
> Therefore, I suggest to separate the current functionality of `select` into 
> two api, the new `select` only handle scalar expressions, and an `agg` accept 
> Aggregates.
> {code}
> def select(exprs: Expression*)
> def agg(aggs: Aggregation*)
> 
> tbl.groupBy('deptno)
>.agg('age.max, 'age.min)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-25 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221603165
  
@vasia in which module were the failures? master has been quite unstable 
recently now that tests are properly failing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-25 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221600182
  
Hey @gallenvara,
I was about to merge this, but I see test failures after rebasing on top of 
master.
Can you please (1) rebase on top of the latest master and squash your 
commits and (2) investigate what's wrong with the tests?
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3945) Degree annotation for directed graphs

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300173#comment-15300173
 ] 

ASF GitHub Bot commented on FLINK-3945:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2021#discussion_r64587577
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param  ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class EdgeDegreesPair
+implements GraphAlgorithm setParallelism(int parallelism) {
+   this.parallelism = parallelism;
+
+   return this;
+   }
+
+   @Override
+   public DataSet>> run(Graph input)
+   throws Exception {
+   // s, t, d(s)
+   DataSet>> edgeSourceDegrees = input
+   .run(new EdgeSourceDegrees()
+   .setParallelism(parallelism));
+
+   // t, d(t)
+   DataSet> vertexDegrees = input
+   .run(new VertexDegrees()
+   .setParallelism(parallelism));
+
+   // s, t, (d(s), d(t))
+   return edgeSourceDegrees
+   .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
--- End diff --

Algorithms should not expect the user to have knowledge of the input graph 
or divine the best configuration. Worst case for an undirected graph is twice 
as many vertices as edges. 


> Degree annotation for directed graphs
> -
>
> Key: FLINK-3945
> URL: https://issues.apache.org/jira/browse/FLINK-3945
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> There is a third degree count for vertices in directed graphs which is the 
> distinct count of out- and in-neighbors. This also adds edge annotation of 
> the vertex degrees for directed graphs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2971) Add outer joins to the Table API

2016-05-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-2971:
-
Priority: Critical  (was: Major)

> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>Priority: Critical
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3945] [gelly] Degree annotation for dir...

2016-05-25 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2021#discussion_r64587577
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import 
org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import 
org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param  ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class EdgeDegreesPair
+implements GraphAlgorithm setParallelism(int parallelism) {
+   this.parallelism = parallelism;
+
+   return this;
+   }
+
+   @Override
+   public DataSet>> run(Graph input)
+   throws Exception {
+   // s, t, d(s)
+   DataSet>> edgeSourceDegrees = input
+   .run(new EdgeSourceDegrees()
+   .setParallelism(parallelism));
+
+   // t, d(t)
+   DataSet> vertexDegrees = input
+   .run(new VertexDegrees()
+   .setParallelism(parallelism));
+
+   // s, t, (d(s), d(t))
+   return edgeSourceDegrees
+   .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
--- End diff --

Algorithms should not expect the user to have knowledge of the input graph 
or divine the best configuration. Worst case for an undirected graph is twice 
as many vertices as edges. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3589] Allow setting Operator parallelis...

2016-05-25 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1778#issuecomment-221597445
  
`PARALLELISM_UNKNOWN` is a no-op which leaves the parallelism unchanged. 
This is a useful default for batch algorithms such as `JaccardIndex` for which 
parallelism can be optionally configured. If the user does not specify a 
parallelism we do not want to override the parallelism of the 
`ExecutionEnvironment` as would happen if we used `PARALLELISM_DEFAULT`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3971) Aggregates handle null values incorrectly.

2016-05-25 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3971:


 Summary: Aggregates handle null values incorrectly.
 Key: FLINK-3971
 URL: https://issues.apache.org/jira/browse/FLINK-3971
 Project: Flink
  Issue Type: Bug
  Components: Table API
Affects Versions: 1.1.0
Reporter: Fabian Hueske
Priority: Critical
 Fix For: 1.1.0


Table API and SQL aggregates are supposed to ignore null values, e.g., 
{{sum(1,2,null,4)}} is supposed to return {{7}}. 

There current implementation is correct if at least one valid value is present 
however, is incorrect if only null values are aggregated. {{sum(null, null, 
null)}} should return {{null}} instead of {{0}}

Currently only the Count aggregate handles the case of null-values-only 
correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types

2016-05-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-3936:
-
Priority: Critical  (was: Major)

> Add MIN / MAX aggregations function for BOOLEAN types
> -
>
> Key: FLINK-3936
> URL: https://issues.apache.org/jira/browse/FLINK-3936
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.1.0
>
>
> When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on 
> Boolean literals to translate a decorrelate subquery in an {{EXIST}} 
> predicate.
> MIN and MAX aggregates on Boolean data types are currently not supported and 
> should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3589) Allow setting Operator parallelism to default value

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300154#comment-15300154
 ] 

ASF GitHub Bot commented on FLINK-3589:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1778#issuecomment-221597445
  
`PARALLELISM_UNKNOWN` is a no-op which leaves the parallelism unchanged. 
This is a useful default for batch algorithms such as `JaccardIndex` for which 
parallelism can be optionally configured. If the user does not specify a 
parallelism we do not want to override the parallelism of the 
`ExecutionEnvironment` as would happen if we used `PARALLELISM_DEFAULT`.


> Allow setting Operator parallelism to default value
> ---
>
> Key: FLINK-3589
> URL: https://issues.apache.org/jira/browse/FLINK-3589
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> User's can override the parallelism for a single operator by calling 
> {{Operator.setParallelism}}, which accepts a positive value. {{Operator}} 
> uses {{-1}} to indicate default parallelism. It would be nice to name and 
> accept this default value.
> This would enable user algorithms to allow configurable parallelism while 
> still chaining operator methods.
> For example, currently:
> {code}
>   private int parallelism;
>   ...
>   public void setParallelism(int parallelism) {
>   this.parallelism = parallelism;
>   }
>   ...
>   MapOperator, Edge> newEdges = 
> edges
>   .map(new MyMapFunction())
>   .name("My map function");
>   if (parallelism > 0) {
>   newEdges.setParallelism(parallelism);
>   }
> {code}
> Could be simplified to:
> {code}
>   private int parallelism = Operator.DEFAULT_PARALLELISM;
>   ...
>   public void setParallelism(int parallelism) {
>   this.parallelism = parallelism;
>   }
>   ...
>   DataSet> newEdges = edges
>   .map(new MyMapFunction())
>   .setParallelism(parallelism)
>   .name("My map function");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-05-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-3859:
-
Priority: Critical  (was: Major)

> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300162#comment-15300162
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-221600182
  
Hey @gallenvara,
I was about to merge this, but I see test failures after rebasing on top of 
master.
Can you please (1) rebase on top of the latest master and squash your 
commits and (2) investigate what's wrong with the tests?
Thank you!


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3723) Aggregate Functions and scalar expressions shouldn't be mixed in select

2016-05-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-3723:
-
Priority: Critical  (was: Major)

> Aggregate Functions and scalar expressions shouldn't be mixed in select
> ---
>
> Key: FLINK-3723
> URL: https://issues.apache.org/jira/browse/FLINK-3723
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.1
>Reporter: Yijie Shen
>Priority: Critical
>
> When we type {code}select deptno, name, max(age) from dept group by 
> deptno;{code} in calcite or Oracle, it will complain {code}Expression 'NAME' 
> is not being grouped{code} or {code}Column 'dept.name' is invalid in the 
> select list because it is not contained in either an aggregate function or 
> the GROUP BY clause.{code} because of the nondeterministic result.
> Therefore, I suggest to separate the current functionality of `select` into 
> two api, the new `select` only handle scalar expressions, and an `agg` accept 
> Aggregates.
> {code}
> def select(exprs: Expression*)
> def agg(aggs: Aggregation*)
> 
> tbl.groupBy('deptno)
>.agg('age.max, 'age.min)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3152) Support all comparisons for Date type

2016-05-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-3152:
-
Priority: Critical  (was: Major)

> Support all comparisons for Date type
> -
>
> Key: FLINK-3152
> URL: https://issues.apache.org/jira/browse/FLINK-3152
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Timo Walther
>Priority: Critical
>
> Currently, the Table API does not support comparisons like "DATE < DATE", 
> "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types

2016-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300131#comment-15300131
 ] 

ASF GitHub Bot commented on FLINK-3936:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/2035

[FLINK-3936] [tableAPI] Add MIN/MAX aggregation for Boolean.

Adds support for min/max aggregations on boolean columns.

Unit tests for new aggregation included.

- [X] General
- [X] Documentation
- [X] Tests & Build



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableBoolMinMax

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2035.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2035


commit 1e0430bcce0633d48a5ef568624d3934e8723913
Author: Fabian Hueske 
Date:   2016-05-22T13:46:06Z

[FLINK-3936] [tableAPI] Add MIN/MAX aggregation for Boolean.




> Add MIN / MAX aggregations function for BOOLEAN types
> -
>
> Key: FLINK-3936
> URL: https://issues.apache.org/jira/browse/FLINK-3936
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
> Fix For: 1.1.0
>
>
> When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on 
> Boolean literals to translate a decorrelate subquery in an {{EXIST}} 
> predicate.
> MIN and MAX aggregates on Boolean data types are currently not supported and 
> should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >