[jira] [Created] (FLINK-3976) Alias fields of physical plan node to avoid name collision with Calcite protected fields
Yijie Shen created FLINK-3976: - Summary: Alias fields of physical plan node to avoid name collision with Calcite protected fields Key: FLINK-3976 URL: https://issues.apache.org/jira/browse/FLINK-3976 Project: Flink Issue Type: Improvement Components: Table API Reporter: Yijie Shen Priority: Minor The field name collisions in physical plan node make IDE hard to get a field's type and unable to give accurate hint while developing, I suggest we rename them :) The collisions are: {{rowType}}, {{input}}, {{table}}, {{left}}, {{right}}, {{tuples}}, -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3976) Alias fields of physical plan node to avoid name collision with Calcite protected fields
[ https://issues.apache.org/jira/browse/FLINK-3976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yijie Shen updated FLINK-3976: -- Description: The field name collisions in physical plan node make IDE hard to get a field's type and unable to give accurate hint while developing, I suggest we rename them :) The collisions are: {{rowType}}, {{input}}, {{table}}, {{left}}, {{right}}, {{tuples}}. was: The field name collisions in physical plan node make IDE hard to get a field's type and unable to give accurate hint while developing, I suggest we rename them :) The collisions are: {{rowType}}, {{input}}, {{table}}, {{left}}, {{right}}, {{tuples}}, > Alias fields of physical plan node to avoid name collision with Calcite > protected fields > > > Key: FLINK-3976 > URL: https://issues.apache.org/jira/browse/FLINK-3976 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Yijie Shen >Priority: Minor > > The field name collisions in physical plan node make IDE hard to get a > field's type and unable to give accurate hint while developing, I suggest we > rename them :) > The collisions are: {{rowType}}, {{input}}, {{table}}, {{left}}, {{right}}, > {{tuples}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)
[ https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301332#comment-15301332 ] ASF GitHub Bot commented on FLINK-3941: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/2025#issuecomment-221762766 Hi @fhueske , I've updated the `expectedType` fix and reverted changes in `DataSetUnion` and `DataSetUnionRule` to make the implementation clear and simple, what do you think? > Add support for UNION (with duplicate elimination) > -- > > Key: FLINK-3941 > URL: https://issues.apache.org/jira/browse/FLINK-3941 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Yijie Shen >Priority: Minor > > Currently, only UNION ALL is supported by Table API and SQL. > UNION (with duplicate elimination) can be supported by applying a > {{DataSet.distinct()}} after the union on all fields. This issue includes: > - Extending {{DataSetUnion}} > - Relaxing {{DataSetUnionRule}} to translated non-all unions. > - Extend the Table API with union() method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...
Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/2025#issuecomment-221762766 Hi @fhueske , I've updated the `expectedType` fix and reverted changes in `DataSetUnion` and `DataSetUnionRule` to make the implementation clear and simple, what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release
[ https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-2765: -- Description: Currently 0.98.11 is used: {code} 0.98.11-hadoop2 {code} Stable release for hadoop-2 is 1.1.x line We should upgrade to 1.2 was: Currently 0.98.11 is used: {code} 0.98.11-hadoop2 {code} Stable release for hadoop-2 is 1.1.x line We should upgrade to 1.2 > Upgrade hbase version for hadoop-2 to 1.2 release > - > > Key: FLINK-2765 > URL: https://issues.apache.org/jira/browse/FLINK-2765 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently 0.98.11 is used: > {code} > 0.98.11-hadoop2 > {code} > Stable release for hadoop-2 is 1.1.x line > We should upgrade to 1.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301143#comment-15301143 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221739936 Hi @ggevay as promised yesterday, I added my comments and suggestions. Let me know what you think. @greghogan, would you like to take a look as well? Another pair of eyes are always a good idea. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64667132 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala --- @@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag]( } /** - * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) - * using an associative reduce function. - */ +* Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) +* using an associative reduce function. +*/ def reduce(fun: (T, T) => T): DataSet[T] = { +reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES) + } + + /** + * Special [[reduce]] operation for explicitly telling the system what strategy to use for the + * combine phase. + * If null is given as the strategy, then the optimizer will pick the strategy. + */ + def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = { --- End diff -- The overloaded methods can be removed if `ReduceOperator` has a `setCombineHint()` method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221740318 This PR does also fix issue FLINK-2246 (add ChainedReduceCombineDriver). Would be nice to have that change in a separate commit before we merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301148#comment-15301148 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221740318 This PR does also fix issue FLINK-2246 (add ChainedReduceCombineDriver). Would be nice to have that change in a separate commit before we merge. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64669221 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java --- @@ -252,4 +262,81 @@ private static String getLongString(int length) { } return bld.toString(); } + + --- End diff -- many blank lines here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64670146 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301128#comment-15301128 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64671543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64671483 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64669166 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTestCommon.java --- @@ -0,0 +1,666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; +import org.apache.flink.runtime.operators.testutils.types.IntList; +import org.apache.flink.runtime.operators.testutils.types.IntListComparator; +import org.apache.flink.runtime.operators.testutils.types.IntListPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntListSerializer; +import org.apache.flink.runtime.operators.testutils.types.IntPair; +import org.apache.flink.runtime.operators.testutils.types.IntPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairListPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer; +import org.apache.flink.runtime.operators.testutils.types.StringPair; +import org.apache.flink.runtime.operators.testutils.types.StringPairComparator; +import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator; +import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer; +import org.apache.flink.util.MutableObjectIterator; + +import org.junit.Test; + +import org.powermock.reflect.Whitebox; + +import static org.junit.Assert.*; + + +public class MemoryHashTableTestCommon { --- End diff -- Can we make this class `abstract` and rename it to `MutableHashTableTestBase`? Then `CompactingHashTableTest` and `ReduceHashTableTest` can extend this class and do not need to implement the shared tests. `MutableHashTableTestBase` could have an abstract method `getHashTable`. `CompactingHashTableTest`'s implementation would return a `CompactingHashTable` and `ReduceHashTableTest` a `ReduceHashTable`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221739936 Hi @ggevay as promised yesterday, I added my comments and suggestions. Let me know what you think. @greghogan, would you like to take a look as well? Another pair of eyes are always a good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301126#comment-15301126 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64671483 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64671403 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64671543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64671325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db
Github user mans2singh commented on the pull request: https://github.com/apache/flink/pull/2031#issuecomment-221737718 Hey @zentol - Thanks for your feedback. - I've renamed the sink as you had recommended. - Currently the driver from rethinkdb supports java 8 only. There is one supporting java 6 but it no longer being maintained. - The current travis failures for jdk 7 are because of the version issue as you had mentioned and those for java 8 are due to failures in the yarn-tests module. - I've not done any performance tests. If you have any suggestions for throughput numbers that might be valuable, or if there is any other sink that I can use for setting up baseline test, please let me know. If you have any other advice/recommendation, please let me know. Thanks again --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64670534 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64669861 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64670218 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301122#comment-15301122 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64671325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301124#comment-15301124 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64671403 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668757 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.hash.ReduceHashTable; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Chained version of ReduceCombineDriver. + */ +public class ChainedReduceCombineDriver extends ChainedDriver{ + + private static final Logger LOG = LoggerFactory.getLogger(ChainedReduceCombineDriver.class); + + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + + private AbstractInvokable parent; + + private TypeSerializer serializer; + + private TypeComparator comparator; + + private ReduceFunction reducer; + + private DriverStrategy strategy; + + private InMemorySorter sorter; + + private QuickSort sortAlgo = new QuickSort(); + + private ReduceHashTable table; + + private List memory; + + private volatile boolean canceled; + + // + + @Override + public Function getStub() { + return reducer; + } + + @Override + public String getTaskName() { + return taskName; + } + + @Override + public void setup(AbstractInvokable parent) { + this.parent = parent; + canceled = false; + + strategy = config.getDriverStrategy(); + + reducer = BatchTask.instantiateUserCode(config, userCodeClassLoader, ReduceFunction.class); + FunctionUtils.setFunctionRuntimeContext(reducer, getUdfRuntimeContext()); + } + + @Override + public void openTask() throws Exception { + // open the stub first + final Configuration stubConfig = config.getStubParameters(); + BatchTask.openUserCode(reducer, stubConfig); + + // instantiate the serializer / comparator + serializer = config.getInputSerializer(0, userCodeClassLoader).getSerializer(); + comparator = config.getDriverComparator(0, userCodeClassLoader).createComparator(); + + MemoryManager memManager = parent.getEnvironment().getMemoryManager(); + final int numMemoryPages = memManager.computeNumberOfPages(config.getRelativeMemoryDriver()); + memory = memManager.allocatePages(parent, numMemoryPages);
[jira] [Commented] (FLINK-3967) Provide RethinkDB Sink for Flink
[ https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301112#comment-15301112 ] ASF GitHub Bot commented on FLINK-3967: --- Github user mans2singh commented on the pull request: https://github.com/apache/flink/pull/2031#issuecomment-221737718 Hey @zentol - Thanks for your feedback. - I've renamed the sink as you had recommended. - Currently the driver from rethinkdb supports java 8 only. There is one supporting java 6 but it no longer being maintained. - The current travis failures for jdk 7 are because of the version issue as you had mentioned and those for java 8 are due to failures in the yarn-tests module. - I've not done any performance tests. If you have any suggestions for throughput numbers that might be valuable, or if there is any other sink that I can use for setting up baseline test, please let me know. If you have any other advice/recommendation, please let me know. Thanks again > Provide RethinkDB Sink for Flink > > > Key: FLINK-3967 > URL: https://issues.apache.org/jira/browse/FLINK-3967 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 1.0.3 > Environment: All >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: features > Fix For: 1.1.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Provide Sink to stream data from flink to rethink db. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301108#comment-15301108 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64670534 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668798 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -114,85 +118,133 @@ public void prepare() throws Exception { MemoryManager memManager = this.taskContext.getMemoryManager(); final int numMemoryPages = memManager.computeNumberOfPages( - this.taskContext.getTaskConfig().getRelativeMemoryDriver()); + this.taskContext.getTaskConfig().getRelativeMemoryDriver()); this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages); - // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter - if (this.comparator.supportsSerializationWithKeyNormalization() && - this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) - { - this.sorter = new FixedLengthRecordSorter(this.serializer, this.comparator, memory); - } else { - this.sorter = new NormalizedKeySorter(this.serializer, this.comparator.duplicate(), memory); - } - ExecutionConfig executionConfig = taskContext.getExecutionConfig(); this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); if (LOG.isDebugEnabled()) { LOG.debug("ReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); } + + switch (strategy) { + case SORTED_PARTIAL_REDUCE: + // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter + if (this.comparator.supportsSerializationWithKeyNormalization() && + this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) { + this.sorter = new FixedLengthRecordSorter(this.serializer, this.comparator, memory); --- End diff -- Use a duplicated comparator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64667955 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java --- @@ -45,7 +45,12 @@ public RandomAccessInputView(ArrayList segments, int segmentSize) { this(segments, segmentSize, segmentSize); } - + + public RandomAccessInputView(ArrayList segments, int segmentSize, boolean dummy) --- End diff -- This constructor is called twice from the constructor of the `ReduceHashTable`. Once to initialize the input view of the `RecordArea` and once for the `StagingArea`. Both areas will need at least one buffer. Maybe I am wrong, but if we give both one initial buffer, we do not need this additional constructor with the `dummy` flag. As a second alternative to the constructor with the `dummy` flag, we could also implement a constructor without the `ArrayList`, create it in the constructor, and add a `getSegementList()` method to access the created list. What do you think, @ggevay? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301104#comment-15301104 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64670218 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668428 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param The data type consumed and produced by the combiner. */ public class ReduceCombineDriver implements Driver{ - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext taskContext; private TypeSerializer serializer; private TypeComparator comparator; - + private ReduceFunction reducer; - + private Collector output; - + + private DriverStrategy strategy; + private InMemorySorter sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable table; + private List memory; - private boolean running; + private volatile boolean canceled; --- End diff -- Can we keep the name of the flag as it is for now? All other drivers use a `running` flag as well. I would rather open a separate JIRA to fix the name in all drivers. I don't think that `volatile` is required here. Accesses to primitive `boolean` are always atomic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668630 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.hash.ReduceHashTable; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Chained version of ReduceCombineDriver. + */ +public class ChainedReduceCombineDriver extends ChainedDriver{ + + private static final Logger LOG = LoggerFactory.getLogger(ChainedReduceCombineDriver.class); + + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + + private AbstractInvokable parent; + + private TypeSerializer serializer; + + private TypeComparator comparator; + + private ReduceFunction reducer; + + private DriverStrategy strategy; + + private InMemorySorter sorter; + + private QuickSort sortAlgo = new QuickSort(); + + private ReduceHashTable table; + + private List memory; + + private volatile boolean canceled; + + // + + @Override + public Function getStub() { + return reducer; + } + + @Override + public String getTaskName() { + return taskName; + } + + @Override + public void setup(AbstractInvokable parent) { + this.parent = parent; + canceled = false; + + strategy = config.getDriverStrategy(); + + reducer = BatchTask.instantiateUserCode(config, userCodeClassLoader, ReduceFunction.class); + FunctionUtils.setFunctionRuntimeContext(reducer, getUdfRuntimeContext()); + } + + @Override + public void openTask() throws Exception { + // open the stub first + final Configuration stubConfig = config.getStubParameters(); + BatchTask.openUserCode(reducer, stubConfig); + + // instantiate the serializer / comparator + serializer = config.getInputSerializer(0, userCodeClassLoader).getSerializer(); + comparator = config.getDriverComparator(0, userCodeClassLoader).createComparator(); + + MemoryManager memManager = parent.getEnvironment().getMemoryManager(); + final int numMemoryPages = memManager.computeNumberOfPages(config.getRelativeMemoryDriver()); + memory = memManager.allocatePages(parent, numMemoryPages);
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301102#comment-15301102 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64670146 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64667048 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java --- @@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) { * @see DataSet */ public ReduceOperator reduce(ReduceFunction reducer) { + return reduce(Utils.getCallLocationName(), reducer, CombineHint.OPTIMIZER_CHOOSES); + } + + /** +* Applies a Reduce transformation on a grouped {@link DataSet}. +* For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} +* until only a single element for each group remains. +* A ReduceFunction combines two elements into one new element of the same type. +* +* @param reducer The ReduceFunction that is applied on each group of the DataSet. +* @param strategy The strategy that should be used to execute the combine phase of the reduce. +* If {@code null} is given, then the optimizer will pick the strategy. +* @return A ReduceOperator that represents the reduced DataSet. +* +* @see org.apache.flink.api.common.functions.RichReduceFunction +* @see ReduceOperator +* @see DataSet +*/ + public ReduceOperator reduce(ReduceFunction reducer, CombineHint strategy) { --- End diff -- The `ReduceOperator.setCombineHint(CombineHint)` method should be annotated with `@PublicEvolving`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668546 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.hash.ReduceHashTable; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Chained version of ReduceCombineDriver. + */ +public class ChainedReduceCombineDriver extends ChainedDriver{ + + private static final Logger LOG = LoggerFactory.getLogger(ChainedReduceCombineDriver.class); + + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + + private AbstractInvokable parent; + + private TypeSerializer serializer; + + private TypeComparator comparator; + + private ReduceFunction reducer; + + private DriverStrategy strategy; + + private InMemorySorter sorter; + + private QuickSort sortAlgo = new QuickSort(); + + private ReduceHashTable table; + + private List memory; + + private volatile boolean canceled; --- End diff -- Can we use `running` instead of `canceled` for the sake of consistency? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64666960 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java --- @@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) { * @see DataSet */ public ReduceOperator reduce(ReduceFunction reducer) { + return reduce(Utils.getCallLocationName(), reducer, CombineHint.OPTIMIZER_CHOOSES); + } + + /** +* Applies a Reduce transformation on a grouped {@link DataSet}. +* For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} +* until only a single element for each group remains. +* A ReduceFunction combines two elements into one new element of the same type. +* +* @param reducer The ReduceFunction that is applied on each group of the DataSet. +* @param strategy The strategy that should be used to execute the combine phase of the reduce. +* If {@code null} is given, then the optimizer will pick the strategy. +* @return A ReduceOperator that represents the reduced DataSet. +* +* @see org.apache.flink.api.common.functions.RichReduceFunction +* @see ReduceOperator +* @see DataSet +*/ + public ReduceOperator reduce(ReduceFunction reducer, CombineHint strategy) { --- End diff -- I don't think we should overload all reduce (or reduce-based) methods in `UnsortedGrouping` but rather add a method `setCombineHint(CombineHint)` to `ReduceOperator`. That way the API doesn't grow too much. This is also more modular once we add a hash-based strategy for the final Reduce. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301097#comment-15301097 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64669861 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301083#comment-15301083 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64669221 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java --- @@ -252,4 +262,81 @@ private static String getLongString(int length) { } return bld.toString(); } + + --- End diff -- many blank lines here > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301082#comment-15301082 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64669166 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTestCommon.java --- @@ -0,0 +1,666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; +import org.apache.flink.runtime.operators.testutils.types.IntList; +import org.apache.flink.runtime.operators.testutils.types.IntListComparator; +import org.apache.flink.runtime.operators.testutils.types.IntListPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntListSerializer; +import org.apache.flink.runtime.operators.testutils.types.IntPair; +import org.apache.flink.runtime.operators.testutils.types.IntPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairListPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer; +import org.apache.flink.runtime.operators.testutils.types.StringPair; +import org.apache.flink.runtime.operators.testutils.types.StringPairComparator; +import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator; +import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer; +import org.apache.flink.util.MutableObjectIterator; + +import org.junit.Test; + +import org.powermock.reflect.Whitebox; + +import static org.junit.Assert.*; + + +public class MemoryHashTableTestCommon { --- End diff -- Can we make this class `abstract` and rename it to `MutableHashTableTestBase`? Then `CompactingHashTableTest` and `ReduceHashTableTest` can extend this class and do not need to implement the shared tests. `MutableHashTableTestBase` could have an abstract method `getHashTable`. `CompactingHashTableTest`'s implementation would return a `CompactingHashTable` and `ReduceHashTableTest` a `ReduceHashTable`. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301073#comment-15301073 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668757 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.hash.ReduceHashTable; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Chained version of ReduceCombineDriver. + */ +public class ChainedReduceCombineDriver extends ChainedDriver{ + + private static final Logger LOG = LoggerFactory.getLogger(ChainedReduceCombineDriver.class); + + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + + private AbstractInvokable parent; + + private TypeSerializer serializer; + + private TypeComparator comparator; + + private ReduceFunction reducer; + + private DriverStrategy strategy; + + private InMemorySorter sorter; + + private QuickSort sortAlgo = new QuickSort(); + + private ReduceHashTable table; + + private List memory; + + private volatile boolean canceled; + + // + + @Override + public Function getStub() { + return reducer; + } + + @Override + public String getTaskName() { + return taskName; + } + + @Override + public void setup(AbstractInvokable parent) { + this.parent = parent; + canceled = false; + + strategy = config.getDriverStrategy(); + + reducer = BatchTask.instantiateUserCode(config, userCodeClassLoader, ReduceFunction.class); + FunctionUtils.setFunctionRuntimeContext(reducer, getUdfRuntimeContext()); + } + + @Override + public void openTask() throws Exception { + // open the stub first + final Configuration stubConfig = config.getStubParameters(); + BatchTask.openUserCode(reducer, stubConfig); + + // instantiate the serializer / comparator + serializer = config.getInputSerializer(0, userCodeClassLoader).getSerializer(); + comparator = config.getDriverComparator(0, userCodeClassLoader).createComparator(); + + MemoryManager
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301061#comment-15301061 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64667955 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java --- @@ -45,7 +45,12 @@ public RandomAccessInputView(ArrayList segments, int segmentSize) { this(segments, segmentSize, segmentSize); } - + + public RandomAccessInputView(ArrayList segments, int segmentSize, boolean dummy) --- End diff -- This constructor is called twice from the constructor of the `ReduceHashTable`. Once to initialize the input view of the `RecordArea` and once for the `StagingArea`. Both areas will need at least one buffer. Maybe I am wrong, but if we give both one initial buffer, we do not need this additional constructor with the `dummy` flag. As a second alternative to the constructor with the `dummy` flag, we could also implement a constructor without the `ArrayList`, create it in the constructor, and add a `getSegementList()` method to access the created list. What do you think, @ggevay? > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301070#comment-15301070 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668630 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.hash.ReduceHashTable; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Chained version of ReduceCombineDriver. + */ +public class ChainedReduceCombineDriver extends ChainedDriver{ + + private static final Logger LOG = LoggerFactory.getLogger(ChainedReduceCombineDriver.class); + + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + + private AbstractInvokable parent; + + private TypeSerializer serializer; + + private TypeComparator comparator; + + private ReduceFunction reducer; + + private DriverStrategy strategy; + + private InMemorySorter sorter; + + private QuickSort sortAlgo = new QuickSort(); + + private ReduceHashTable table; + + private List memory; + + private volatile boolean canceled; + + // + + @Override + public Function getStub() { + return reducer; + } + + @Override + public String getTaskName() { + return taskName; + } + + @Override + public void setup(AbstractInvokable parent) { + this.parent = parent; + canceled = false; + + strategy = config.getDriverStrategy(); + + reducer = BatchTask.instantiateUserCode(config, userCodeClassLoader, ReduceFunction.class); + FunctionUtils.setFunctionRuntimeContext(reducer, getUdfRuntimeContext()); + } + + @Override + public void openTask() throws Exception { + // open the stub first + final Configuration stubConfig = config.getStubParameters(); + BatchTask.openUserCode(reducer, stubConfig); + + // instantiate the serializer / comparator + serializer = config.getInputSerializer(0, userCodeClassLoader).getSerializer(); + comparator = config.getDriverComparator(0, userCodeClassLoader).createComparator(); + + MemoryManager
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301069#comment-15301069 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668546 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.hash.ReduceHashTable; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Chained version of ReduceCombineDriver. + */ +public class ChainedReduceCombineDriver extends ChainedDriver{ + + private static final Logger LOG = LoggerFactory.getLogger(ChainedReduceCombineDriver.class); + + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + + private AbstractInvokable parent; + + private TypeSerializer serializer; + + private TypeComparator comparator; + + private ReduceFunction reducer; + + private DriverStrategy strategy; + + private InMemorySorter sorter; + + private QuickSort sortAlgo = new QuickSort(); + + private ReduceHashTable table; + + private List memory; + + private volatile boolean canceled; --- End diff -- Can we use `running` instead of `canceled` for the sake of consistency? > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301065#comment-15301065 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64668428 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param The data type consumed and produced by the combiner. */ public class ReduceCombineDriver implements Driver{ - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext taskContext; private TypeSerializer serializer; private TypeComparator comparator; - + private ReduceFunction reducer; - + private Collector output; - + + private DriverStrategy strategy; + private InMemorySorter sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable table; + private List memory; - private boolean running; + private volatile boolean canceled; --- End diff -- Can we keep the name of the flag as it is for now? All other drivers use a `running` flag as well. I would rather open a separate JIRA to fix the name in all drivers. I don't think that `volatile` is required here. Accesses to primitive `boolean` are always atomic. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301046#comment-15301046 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64667048 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java --- @@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) { * @see DataSet */ public ReduceOperator reduce(ReduceFunction reducer) { + return reduce(Utils.getCallLocationName(), reducer, CombineHint.OPTIMIZER_CHOOSES); + } + + /** +* Applies a Reduce transformation on a grouped {@link DataSet}. +* For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} +* until only a single element for each group remains. +* A ReduceFunction combines two elements into one new element of the same type. +* +* @param reducer The ReduceFunction that is applied on each group of the DataSet. +* @param strategy The strategy that should be used to execute the combine phase of the reduce. +* If {@code null} is given, then the optimizer will pick the strategy. +* @return A ReduceOperator that represents the reduced DataSet. +* +* @see org.apache.flink.api.common.functions.RichReduceFunction +* @see ReduceOperator +* @see DataSet +*/ + public ReduceOperator reduce(ReduceFunction reducer, CombineHint strategy) { --- End diff -- The `ReduceOperator.setCombineHint(CombineHint)` method should be annotated with `@PublicEvolving`. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301050#comment-15301050 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64667132 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala --- @@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag]( } /** - * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) - * using an associative reduce function. - */ +* Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) +* using an associative reduce function. +*/ def reduce(fun: (T, T) => T): DataSet[T] = { +reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES) + } + + /** + * Special [[reduce]] operation for explicitly telling the system what strategy to use for the + * combine phase. + * If null is given as the strategy, then the optimizer will pick the strategy. + */ + def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = { --- End diff -- The overloaded methods can be removed if `ReduceOperator` has a `setCombineHint()` method > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15301042#comment-15301042 ] ASF GitHub Bot commented on FLINK-3477: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64666960 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java --- @@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) { * @see DataSet */ public ReduceOperator reduce(ReduceFunction reducer) { + return reduce(Utils.getCallLocationName(), reducer, CombineHint.OPTIMIZER_CHOOSES); + } + + /** +* Applies a Reduce transformation on a grouped {@link DataSet}. +* For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} +* until only a single element for each group remains. +* A ReduceFunction combines two elements into one new element of the same type. +* +* @param reducer The ReduceFunction that is applied on each group of the DataSet. +* @param strategy The strategy that should be used to execute the combine phase of the reduce. +* If {@code null} is given, then the optimizer will pick the strategy. +* @return A ReduceOperator that represents the reduced DataSet. +* +* @see org.apache.flink.api.common.functions.RichReduceFunction +* @see ReduceOperator +* @see DataSet +*/ + public ReduceOperator reduce(ReduceFunction reducer, CombineHint strategy) { --- End diff -- I don't think we should overload all reduce (or reduce-based) methods in `UnsortedGrouping` but rather add a method `setCombineHint(CombineHint)` to `ReduceOperator`. That way the API doesn't grow too much. This is also more modular once we add a hash-based strategy for the final Reduce. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] B Wyatt updated FLINK-3974: --- Attachment: bwyatt-FLINK3974.1.patch I was able to fix this for my topology and the test case by making a very small change to {{ChainingOutput.collect()}} in OperatorChain.java. When object reuse is disabled {{CopyingChainingOutput.collect()}} is used to make a full copy of the {{StreamRecord}}: {code:java} StreamRecord copy = record.copy(serializer.copy(record.getValue())); {code} If object reuse is enabled, {{ChainingOutput.collect()}} is used instead. As the internal code will mutate {{StreamRecord<>}} downstream, I thought the best "good-faith" effort to support object reuse was to copy the {{StreamRecord<>}} without copying the value sending this lighter copy downstream instead of the original {{StreamRecord<>}}: {code:java} StreamRecord shallowCopy = record.copy(record.getValue()); {code} See attached bwyatt-FLINK3974.1.patch > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > ​.addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3970) How to deal with "resouce isolation" problem
[ https://issues.apache.org/jira/browse/FLINK-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300746#comment-15300746 ] Stephan Ewen commented on FLINK-3970: - I think the best way to completely isolate jobs against each other (CPU, network, etc) is to run them in different processes. The best way to do that is to start a flink cluster per job. That is quite simple with YARN: Submitting the job in a per-job-cluster mode brings up the Flink cluster on YARN, runs the job, and shuts the cluster down. You can do something similar in other containerized environments. Hope that solution works for you. > How to deal with "resouce isolation" problem > > > Key: FLINK-3970 > URL: https://issues.apache.org/jira/browse/FLINK-3970 > Project: Flink > Issue Type: Wish >Reporter: ZhengBowen > > For example, 'big query' and 'small query' are executed at the same time, you > need isolate 'big query' and 'small query' to prevent 'big query' exhaust > resouce(including i/o,mem,network) to make the 'small query' can complete > quickly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3806] [gelly] Revert use of DataSet.cou...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2036 [FLINK-3806] [gelly] Revert use of DataSet.count() This leaves the Graph API unchanged but GatherSumApplyIteration and ScatterGatherIteration now use broadcast variables to share the numberOfVertices count. The PageRanks have been updated to use this feature. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3806_revert_use_of_dataset_count Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2036.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2036 commit c56cbf997ceaec7e54f5d0847e960337ca1d84ba Author: Greg HoganDate: 2016-05-25T15:06:01Z [FLINK-3806] [gelly] Revert use of DataSet.count() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3975) docs build script isn't serving the preview on the correct base url
Dyana Rose created FLINK-3975: - Summary: docs build script isn't serving the preview on the correct base url Key: FLINK-3975 URL: https://issues.apache.org/jira/browse/FLINK-3975 Project: Flink Issue Type: Bug Components: Documentation Environment: centos 7.2, jekyll 3.1.6, ruby 2.3.1 Reporter: Dyana Rose Priority: Trivial when running the documentation build script as: {{./build_docs.sh -p}} the docs are built using the correctly overridden baseurl, but they are then served from the wrong url making it a wee bit more difficult than intended to review changes locally. The following is the output from running the script: {quote} Configuration file: _config.yml Configuration file: _local_preview_conf.yml Source: /vagrant/flink/docs Destination: /vagrant/flink/docs/target Incremental build: disabled. Enable with --incremental Generating... /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57: warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode 040777 done in 16.736 seconds. Auto-regeneration: enabled for '/vagrant/flink/docs' Configuration file: /vagrant/flink/docs/_config.yml Server address: http://0.0.0.0:4000//ci.apache.org/projects/flink/flink-docs-master/ Server running... press ctrl-c to stop. {quote} As you see it looks to be using both config files to build, but only the default config file to serve. This can be fixed by just removing the {{_local_preview_conf.yml}} file and instead in specify the baseurl as an option to the serve command, so it becomes {{serve --config _config.yml --baseurl= --watch}}. Giving an output of: {quote} Configuration file: _config.yml Source: /vagrant/flink/docs Destination: /vagrant/flink/docs/target Incremental build: disabled. Enable with --incremental Generating... /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57: warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode 040777 done in 15.928 seconds. Auto-regeneration: enabled for '/vagrant/flink/docs' Configuration file: /vagrant/flink/docs/_config.yml Server address: http://0.0.0.0:4000/ Server running... press ctrl-c to stop. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3
[ https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3801: -- Description: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 was: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 > Upgrade Joda-Time library to 2.9.3 > -- > > Key: FLINK-3801 > URL: https://issues.apache.org/jira/browse/FLINK-3801 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently yoda-time 2.5 is used which was very old. > We should upgrade to 2.9.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly
[ https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300691#comment-15300691 ] ASF GitHub Bot commented on FLINK-3806: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2036 [FLINK-3806] [gelly] Revert use of DataSet.count() This leaves the Graph API unchanged but GatherSumApplyIteration and ScatterGatherIteration now use broadcast variables to share the numberOfVertices count. The PageRanks have been updated to use this feature. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3806_revert_use_of_dataset_count Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2036.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2036 commit c56cbf997ceaec7e54f5d0847e960337ca1d84ba Author: Greg HoganDate: 2016-05-25T15:06:01Z [FLINK-3806] [gelly] Revert use of DataSet.count() > Revert use of DataSet.count() in Gelly > -- > > Key: FLINK-3806 > URL: https://issues.apache.org/jira/browse/FLINK-3806 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.1.0 > > > FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The > former returns a {{DataSet}} while the latter executes a job to return a Java > value. > {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and > {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and > {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the > user does not pass the number of vertices as a parameter. > As noted in FLINK-1632, this does make the code simpler but if my > understanding is correct will materialize the Graph twice. The Graph will > need to be reread from input, regenerated, or recomputed by preceding > algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221670393 And @th0br0 is working on CombineHint.NONE. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300638#comment-15300638 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221670393 And @th0br0 is working on CombineHint.NONE. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300642#comment-15300642 ] B Wyatt commented on FLINK-3974: After attempting to produce a minimal repro, I realized that there needs to be an operation between the source and the parallel maps: {code:java} DataStream input = ... input = input .map(MapFunction...); input .map(MapFunction...) .addSink(...); input .map(MapFunction...) ​.addSink(...); {code} see attached repro case > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > ​.addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] B Wyatt updated FLINK-3974: --- Comment: was deleted (was: Simple repro program.) > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > ​.addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] B Wyatt updated FLINK-3974: --- Attachment: ReproFLINK3974.java Simple repro program. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > ​.addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300456#comment-15300456 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221644628 We talked about this yesterday with @fhueske at the Flink meetup, and the conclusion was that we will get it into 1.1. @fhueske has several comments that he will write here in a few days, and then I will make the (hopefully) final adjustments. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221644628 We talked about this yesterday with @fhueske at the Flink meetup, and the conclusion was that we will get it into 1.1. @fhueske has several comments that he will write here in a few days, and then I will make the (hopefully) final adjustments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300453#comment-15300453 ] ASF GitHub Bot commented on FLINK-3477: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221643813 What is the status of this PR? Can we get this in and tested for the 1.1 release? > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1517#issuecomment-221643813 What is the status of this PR? Can we get this in and tested for the 1.1 release? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-2044. -- Resolution: Implemented Fix Version/s: 1.1.0 > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > Fix For: 1.1.0 > > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300415#comment-15300415 ] ASF GitHub Bot commented on FLINK-2044: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1956 > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > Fix For: 1.1.0 > > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1956 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300409#comment-15300409 ] ASF GitHub Bot commented on FLINK-2044: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221637781 Thanks @gallenvara! All tests pass now. I will merge :) > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221637781 Thanks @gallenvara! All tests pass now. I will merge :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
B Wyatt created FLINK-3974: -- Summary: enableObjectReuse fails when an operator chains to multiple downstream operators Key: FLINK-3974 URL: https://issues.apache.org/jira/browse/FLINK-3974 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.0.3 Reporter: B Wyatt Given a topology that looks like this: {code:java} DataStream input = ... input .map(MapFunction...) .addSink(...); input .map(MapFunction...) ​.addSink(...); {code} enableObjectReuse() will cause an exception in the form of {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. It looks like the input operator calls {{Output.collect}} which attempts to loop over the downstream operators and process them. However, the first map operation will call {{StreamRecord<>.replace}} which mutates the value stored in the StreamRecord<>. As a result, when the {{Output .collect}} call passes the {{StreamRecord}} to the second map operation it is actually a {{StreamRecord}} and behaves as if the two map operations were serial instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3973) Table API documentation is "hidden" in Programming Guide menu list
Fabian Hueske created FLINK-3973: Summary: Table API documentation is "hidden" in Programming Guide menu list Key: FLINK-3973 URL: https://issues.apache.org/jira/browse/FLINK-3973 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.1.0 Reporter: Fabian Hueske Fix For: 1.1.0 The Table API / SQL documentation is hard to find in the drop down list of the "Programming Guide" menu entry. We should either move it into the "Libraries" menu or move it up in the "Programming Guide" entry and bold like the other top entries. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3714) Add Support for "Allowed Lateness"
[ https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-3714: - Assignee: Kostas Kloudas > Add Support for "Allowed Lateness" > -- > > Key: FLINK-3714 > URL: https://issues.apache.org/jira/browse/FLINK-3714 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > > As mentioned in > https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit# > we should add support for an allowed lateness setting. > This includes several things: > - API for setting allowed lateness > - Dropping of late elements > - Garbage collection of event-time windows > Lateness only makes sense for event-time windows. So we also have to figure > out what the API for this should look like and especially what should happen > with the "stream-time characteristic" switch. For example in this: > {code} > env.setStreamTimeCharacteristic(ProcessingTime) > ... > DataStream in = ... > result = in > .keyBy() > .timeWindow() > .allowedLateness() > .apply() > {code} > I think the setting can be silently ignored when doing processing-time > windowing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1673) Colocate Flink Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-1673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-1673. --- Resolution: Won't Fix > Colocate Flink Kafka consumer > - > > Key: FLINK-1673 > URL: https://issues.apache.org/jira/browse/FLINK-1673 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Márton Balassi >Assignee: Tamás Krutki > > Kafka exposes the location of the replicas. To make the Flink Kafka Consumers > more effective we could do a best effort colocation for the sources with the > Kafka brokers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300238#comment-15300238 ] ASF GitHub Bot commented on FLINK-2044: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221616393 @vasia In the travis failure reports, the failures are relevant with flink-yarn-tests module. I have merged the latest code from master and rebase all my commit in this PR. And the `HITSAlgorithmITCase` runs successfully in local. > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1673) Colocate Flink Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-1673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300236#comment-15300236 ] Robert Metzger commented on FLINK-1673: --- I can not remember any user asking for this feature. I think we can close it as won't fix for now. > Colocate Flink Kafka consumer > - > > Key: FLINK-1673 > URL: https://issues.apache.org/jira/browse/FLINK-1673 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Márton Balassi >Assignee: Tamás Krutki > > Kafka exposes the location of the replicas. To make the Flink Kafka Consumers > more effective we could do a best effort colocation for the sources with the > Kafka brokers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221616393 @vasia In the travis failure reports, the failures are relevant with flink-yarn-tests module. I have merged the latest code from master and rebase all my commit in this PR. And the `HITSAlgorithmITCase` runs successfully in local. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3945) Degree annotation for directed graphs
[ https://issues.apache.org/jira/browse/FLINK-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300223#comment-15300223 ] ASF GitHub Bot commented on FLINK-3945: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2021#discussion_r64594890 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of both the source and target vertices. + * + * @param ID type + * @param vertex value type + * @param edge value type + */ +public class EdgeDegreesPair+implements GraphAlgorithm setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet >> run(Graph input) + throws Exception { + // s, t, d(s) + DataSet >> edgeSourceDegrees = input + .run(new EdgeSourceDegrees () + .setParallelism(parallelism)); + + // t, d(t) + DataSet > vertexDegrees = input + .run(new VertexDegrees () + .setParallelism(parallelism)); + + // s, t, (d(s), d(t)) + return edgeSourceDegrees + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) --- End diff -- I'm amenable to adding this configuration if we can show noticeably improved performance. Choosing a bad `JoinHint` will result in significantly degraded performance for most graphs in addition to adding to the user's cognitive load. > Degree annotation for directed graphs > - > > Key: FLINK-3945 > URL: https://issues.apache.org/jira/browse/FLINK-3945 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > There is a third degree count for vertices in directed graphs which is the > distinct count of out- and in-neighbors. This also adds edge annotation of > the vertex degrees for directed graphs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3945] [gelly] Degree annotation for dir...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2021#discussion_r64594890 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of both the source and target vertices. + * + * @param ID type + * @param vertex value type + * @param edge value type + */ +public class EdgeDegreesPair+implements GraphAlgorithm setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet >> run(Graph input) + throws Exception { + // s, t, d(s) + DataSet >> edgeSourceDegrees = input + .run(new EdgeSourceDegrees () + .setParallelism(parallelism)); + + // t, d(t) + DataSet > vertexDegrees = input + .run(new VertexDegrees () + .setParallelism(parallelism)); + + // s, t, (d(s), d(t)) + return edgeSourceDegrees + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) --- End diff -- I'm amenable to adding this configuration if we can show noticeably improved performance. Choosing a bad `JoinHint` will result in significantly degraded performance for most graphs in addition to adding to the user's cognitive load. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3972) Subclasses of ResourceID may not to be serializable
Maximilian Michels created FLINK-3972: - Summary: Subclasses of ResourceID may not to be serializable Key: FLINK-3972 URL: https://issues.apache.org/jira/browse/FLINK-3972 Project: Flink Issue Type: Bug Components: ResourceManager Affects Versions: 1.1.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Minor Fix For: 1.1.0 WorkerTypes are currently subclasses of ResourceID. ResourceID has to be Serializable but its subclasses don't. This may lead to problems when these subclasses are used as ResourceIDs, i.e. serialization may fail with NotSerializableExceptions. Currently, subclasses are never send over the wire but they might be in the future. Instead of relying on subclasses of ResourceID for the WorkerTypes, we can let them implement an interface to retrieve the ResourceID of a WorkerType. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3806) Revert use of DataSet.count() in Gelly
[ https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3806: -- Fix Version/s: 1.1.0 > Revert use of DataSet.count() in Gelly > -- > > Key: FLINK-3806 > URL: https://issues.apache.org/jira/browse/FLINK-3806 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.1.0 > > > FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The > former returns a {{DataSet}} while the latter executes a job to return a Java > value. > {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and > {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and > {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the > user does not pass the number of vertices as a parameter. > As noted in FLINK-1632, this does make the code simpler but if my > understanding is correct will materialize the Graph twice. The Graph will > need to be reread from input, regenerated, or recomputed by preceding > algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221606502 @vasia wait for minutes and i will take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3945] [gelly] Degree annotation for dir...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2021#discussion_r64589442 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of both the source and target vertices. + * + * @param ID type + * @param vertex value type + * @param edge value type + */ +public class EdgeDegreesPair+implements GraphAlgorithm setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet >> run(Graph input) + throws Exception { + // s, t, d(s) + DataSet >> edgeSourceDegrees = input + .run(new EdgeSourceDegrees () + .setParallelism(parallelism)); + + // t, d(t) + DataSet > vertexDegrees = input + .run(new VertexDegrees () + .setParallelism(parallelism)); + + // s, t, (d(s), d(t)) + return edgeSourceDegrees + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) --- End diff -- I agree that we should automatically set the parameters when possible. So, I would keep this as the default setting. However, I think we should allow users that _do_ have knowledge about the input graph to either let the optimizer decide or override the default setting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3945) Degree annotation for directed graphs
[ https://issues.apache.org/jira/browse/FLINK-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300183#comment-15300183 ] ASF GitHub Bot commented on FLINK-3945: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2021#discussion_r64589442 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of both the source and target vertices. + * + * @param ID type + * @param vertex value type + * @param edge value type + */ +public class EdgeDegreesPair+implements GraphAlgorithm setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet >> run(Graph input) + throws Exception { + // s, t, d(s) + DataSet >> edgeSourceDegrees = input + .run(new EdgeSourceDegrees () + .setParallelism(parallelism)); + + // t, d(t) + DataSet > vertexDegrees = input + .run(new VertexDegrees () + .setParallelism(parallelism)); + + // s, t, (d(s), d(t)) + return edgeSourceDegrees + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) --- End diff -- I agree that we should automatically set the parameters when possible. So, I would keep this as the default setting. However, I think we should allow users that _do_ have knowledge about the input graph to either let the optimizer decide or override the default setting. > Degree annotation for directed graphs > - > > Key: FLINK-3945 > URL: https://issues.apache.org/jira/browse/FLINK-3945 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > There is a third degree count for vertices in directed graphs which is the > distinct count of out- and in-neighbors. This also adds edge annotation of > the vertex degrees for directed graphs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221603967 The failures were in the `HITSAlgorithmITCase`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300178#comment-15300178 ] ASF GitHub Bot commented on FLINK-2044: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221603967 The failures were in the `HITSAlgorithmITCase`. > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300177#comment-15300177 ] ASF GitHub Bot commented on FLINK-2044: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221603165 @vasia in which module were the failures? master has been quite unstable recently now that tests are properly failing. > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3723) Aggregate Functions and scalar expressions shouldn't be mixed in select
[ https://issues.apache.org/jira/browse/FLINK-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300157#comment-15300157 ] Fabian Hueske commented on FLINK-3723: -- We should resolve this issue (either way) before the Flink 1.1.0. I would like to avoid breaking the API after the release. > Aggregate Functions and scalar expressions shouldn't be mixed in select > --- > > Key: FLINK-3723 > URL: https://issues.apache.org/jira/browse/FLINK-3723 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.1 >Reporter: Yijie Shen >Priority: Critical > > When we type {code}select deptno, name, max(age) from dept group by > deptno;{code} in calcite or Oracle, it will complain {code}Expression 'NAME' > is not being grouped{code} or {code}Column 'dept.name' is invalid in the > select list because it is not contained in either an aggregate function or > the GROUP BY clause.{code} because of the nondeterministic result. > Therefore, I suggest to separate the current functionality of `select` into > two api, the new `select` only handle scalar expressions, and an `agg` accept > Aggregates. > {code} > def select(exprs: Expression*) > def agg(aggs: Aggregation*) > > tbl.groupBy('deptno) >.agg('age.max, 'age.min) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221603165 @vasia in which module were the failures? master has been quite unstable recently now that tests are properly failing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221600182 Hey @gallenvara, I was about to merge this, but I see test failures after rebasing on top of master. Can you please (1) rebase on top of the latest master and squash your commits and (2) investigate what's wrong with the tests? Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3945) Degree annotation for directed graphs
[ https://issues.apache.org/jira/browse/FLINK-3945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300173#comment-15300173 ] ASF GitHub Bot commented on FLINK-3945: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2021#discussion_r64587577 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of both the source and target vertices. + * + * @param ID type + * @param vertex value type + * @param edge value type + */ +public class EdgeDegreesPair+implements GraphAlgorithm setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet >> run(Graph input) + throws Exception { + // s, t, d(s) + DataSet >> edgeSourceDegrees = input + .run(new EdgeSourceDegrees () + .setParallelism(parallelism)); + + // t, d(t) + DataSet > vertexDegrees = input + .run(new VertexDegrees () + .setParallelism(parallelism)); + + // s, t, (d(s), d(t)) + return edgeSourceDegrees + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) --- End diff -- Algorithms should not expect the user to have knowledge of the input graph or divine the best configuration. Worst case for an undirected graph is twice as many vertices as edges. > Degree annotation for directed graphs > - > > Key: FLINK-3945 > URL: https://issues.apache.org/jira/browse/FLINK-3945 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > There is a third degree count for vertices in directed graphs which is the > distinct count of out- and in-neighbors. This also adds edge annotation of > the vertex degrees for directed graphs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-2971: - Priority: Critical (was: Major) > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz >Priority: Critical > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3945] [gelly] Degree annotation for dir...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2021#discussion_r64587577 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate.directed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; + +/** + * Annotates edges of a directed graph with the degree, out-degree, and + * in-degree of both the source and target vertices. + * + * @param ID type + * @param vertex value type + * @param edge value type + */ +public class EdgeDegreesPair+implements GraphAlgorithm setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet >> run(Graph input) + throws Exception { + // s, t, d(s) + DataSet >> edgeSourceDegrees = input + .run(new EdgeSourceDegrees () + .setParallelism(parallelism)); + + // t, d(t) + DataSet > vertexDegrees = input + .run(new VertexDegrees () + .setParallelism(parallelism)); + + // s, t, (d(s), d(t)) + return edgeSourceDegrees + .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) --- End diff -- Algorithms should not expect the user to have knowledge of the input graph or divine the best configuration. Worst case for an undirected graph is twice as many vertices as edges. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3589] Allow setting Operator parallelis...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1778#issuecomment-221597445 `PARALLELISM_UNKNOWN` is a no-op which leaves the parallelism unchanged. This is a useful default for batch algorithms such as `JaccardIndex` for which parallelism can be optionally configured. If the user does not specify a parallelism we do not want to override the parallelism of the `ExecutionEnvironment` as would happen if we used `PARALLELISM_DEFAULT`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3971) Aggregates handle null values incorrectly.
Fabian Hueske created FLINK-3971: Summary: Aggregates handle null values incorrectly. Key: FLINK-3971 URL: https://issues.apache.org/jira/browse/FLINK-3971 Project: Flink Issue Type: Bug Components: Table API Affects Versions: 1.1.0 Reporter: Fabian Hueske Priority: Critical Fix For: 1.1.0 Table API and SQL aggregates are supposed to ignore null values, e.g., {{sum(1,2,null,4)}} is supposed to return {{7}}. There current implementation is correct if at least one valid value is present however, is incorrect if only null values are aggregated. {{sum(null, null, null)}} should return {{null}} instead of {{0}} Currently only the Count aggregate handles the case of null-values-only correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types
[ https://issues.apache.org/jira/browse/FLINK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-3936: - Priority: Critical (was: Major) > Add MIN / MAX aggregations function for BOOLEAN types > - > > Key: FLINK-3936 > URL: https://issues.apache.org/jira/browse/FLINK-3936 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.1.0 > > > When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on > Boolean literals to translate a decorrelate subquery in an {{EXIST}} > predicate. > MIN and MAX aggregates on Boolean data types are currently not supported and > should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3589) Allow setting Operator parallelism to default value
[ https://issues.apache.org/jira/browse/FLINK-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300154#comment-15300154 ] ASF GitHub Bot commented on FLINK-3589: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1778#issuecomment-221597445 `PARALLELISM_UNKNOWN` is a no-op which leaves the parallelism unchanged. This is a useful default for batch algorithms such as `JaccardIndex` for which parallelism can be optionally configured. If the user does not specify a parallelism we do not want to override the parallelism of the `ExecutionEnvironment` as would happen if we used `PARALLELISM_DEFAULT`. > Allow setting Operator parallelism to default value > --- > > Key: FLINK-3589 > URL: https://issues.apache.org/jira/browse/FLINK-3589 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > User's can override the parallelism for a single operator by calling > {{Operator.setParallelism}}, which accepts a positive value. {{Operator}} > uses {{-1}} to indicate default parallelism. It would be nice to name and > accept this default value. > This would enable user algorithms to allow configurable parallelism while > still chaining operator methods. > For example, currently: > {code} > private int parallelism; > ... > public void setParallelism(int parallelism) { > this.parallelism = parallelism; > } > ... > MapOperator, Edge > newEdges = > edges > .map(new MyMapFunction()) > .name("My map function"); > if (parallelism > 0) { > newEdges.setParallelism(parallelism); > } > {code} > Could be simplified to: > {code} > private int parallelism = Operator.DEFAULT_PARALLELISM; > ... > public void setParallelism(int parallelism) { > this.parallelism = parallelism; > } > ... > DataSet > newEdges = edges > .map(new MyMapFunction()) > .setParallelism(parallelism) > .name("My map function"); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-3859: - Priority: Critical (was: Major) > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300162#comment-15300162 ] ASF GitHub Bot commented on FLINK-2044: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-221600182 Hey @gallenvara, I was about to merge this, but I see test failures after rebasing on top of master. Can you please (1) rebase on top of the latest master and squash your commits and (2) investigate what's wrong with the tests? Thank you! > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3723) Aggregate Functions and scalar expressions shouldn't be mixed in select
[ https://issues.apache.org/jira/browse/FLINK-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-3723: - Priority: Critical (was: Major) > Aggregate Functions and scalar expressions shouldn't be mixed in select > --- > > Key: FLINK-3723 > URL: https://issues.apache.org/jira/browse/FLINK-3723 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.1 >Reporter: Yijie Shen >Priority: Critical > > When we type {code}select deptno, name, max(age) from dept group by > deptno;{code} in calcite or Oracle, it will complain {code}Expression 'NAME' > is not being grouped{code} or {code}Column 'dept.name' is invalid in the > select list because it is not contained in either an aggregate function or > the GROUP BY clause.{code} because of the nondeterministic result. > Therefore, I suggest to separate the current functionality of `select` into > two api, the new `select` only handle scalar expressions, and an `agg` accept > Aggregates. > {code} > def select(exprs: Expression*) > def agg(aggs: Aggregation*) > > tbl.groupBy('deptno) >.agg('age.max, 'age.min) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3152) Support all comparisons for Date type
[ https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-3152: - Priority: Critical (was: Major) > Support all comparisons for Date type > - > > Key: FLINK-3152 > URL: https://issues.apache.org/jira/browse/FLINK-3152 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Timo Walther >Priority: Critical > > Currently, the Table API does not support comparisons like "DATE < DATE", > "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types
[ https://issues.apache.org/jira/browse/FLINK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300131#comment-15300131 ] ASF GitHub Bot commented on FLINK-3936: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/2035 [FLINK-3936] [tableAPI] Add MIN/MAX aggregation for Boolean. Adds support for min/max aggregations on boolean columns. Unit tests for new aggregation included. - [X] General - [X] Documentation - [X] Tests & Build You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableBoolMinMax Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2035.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2035 commit 1e0430bcce0633d48a5ef568624d3934e8723913 Author: Fabian HueskeDate: 2016-05-22T13:46:06Z [FLINK-3936] [tableAPI] Add MIN/MAX aggregation for Boolean. > Add MIN / MAX aggregations function for BOOLEAN types > - > > Key: FLINK-3936 > URL: https://issues.apache.org/jira/browse/FLINK-3936 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > Fix For: 1.1.0 > > > When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on > Boolean literals to translate a decorrelate subquery in an {{EXIST}} > predicate. > MIN and MAX aggregates on Boolean data types are currently not supported and > should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)