[jira] [Created] (FLINK-3855) Upgrade Jackson version
Tatu Saloranta created FLINK-3855: - Summary: Upgrade Jackson version Key: FLINK-3855 URL: https://issues.apache.org/jira/browse/FLINK-3855 Project: Flink Issue Type: Improvement Components: Streaming Connectors Affects Versions: 1.0.3 Reporter: Tatu Saloranta Support rolling sink writer in avro key value format. preferably without additional classpath dependencies preferable in same format as M/R jobs for backward compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3855) Upgrade Jackson version
[ https://issues.apache.org/jira/browse/FLINK-3855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tatu Saloranta updated FLINK-3855: -- Component/s: (was: Streaming Connectors) Core > Upgrade Jackson version > --- > > Key: FLINK-3855 > URL: https://issues.apache.org/jira/browse/FLINK-3855 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Priority: Minor > > Support rolling sink writer in avro key value format. > preferably without additional classpath dependencies > preferable in same format as M/R jobs for backward compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3855) Upgrade Jackson version
[ https://issues.apache.org/jira/browse/FLINK-3855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15265115#comment-15265115 ] ASF GitHub Bot commented on FLINK-3855: --- GitHub user cowtowncoder opened a pull request: https://github.com/apache/flink/pull/1952 Jackson version upgrade: default from 2.4.2 to 2.5.5, ES client to latest 2.7 Fix for [FLINK-3855] -- upgrade default Jackson version from 2.4.2 to 2.5.5 (last 2.5 patch); remove unnecessary explicit version ref, and upgrade ES client to latest patch for minor version it refers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cowtowncoder/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1952.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1952 commit 243f49a95c971c3caac831ef9131d73d4d562587 Author: Tatu Saloranta Date: 2016-04-30T03:26:27Z Jackson version upgrade: default from 2.4.2 to 2.5.5 (last patch of next minor version); elastic client to latest patch > Upgrade Jackson version > --- > > Key: FLINK-3855 > URL: https://issues.apache.org/jira/browse/FLINK-3855 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Priority: Minor > > Jackson version in use (2.4.2) is rather old (and not even the latest patch > from minor version), so it'd be make sense to upgrade to bit newer. Latest > would be 2.7.4, but at first I propose going to 2.5.5. > All tests pass, but if there are issues I'd be happy to help; I'm author of > Jackson project. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Jackson version upgrade: default from 2.4.2 to...
GitHub user cowtowncoder opened a pull request: https://github.com/apache/flink/pull/1952 Jackson version upgrade: default from 2.4.2 to 2.5.5, ES client to latest 2.7 Fix for [FLINK-3855] -- upgrade default Jackson version from 2.4.2 to 2.5.5 (last 2.5 patch); remove unnecessary explicit version ref, and upgrade ES client to latest patch for minor version it refers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cowtowncoder/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1952.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1952 commit 243f49a95c971c3caac831ef9131d73d4d562587 Author: Tatu Saloranta Date: 2016-04-30T03:26:27Z Jackson version upgrade: default from 2.4.2 to 2.5.5 (last patch of next minor version); elastic client to latest patch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3855) Upgrade Jackson version
[ https://issues.apache.org/jira/browse/FLINK-3855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tatu Saloranta updated FLINK-3855: -- Description: Jackson version in use (2.4.2) is rather old (and not even the latest patch from minor version), so it'd be make sense to upgrade to bit newer. Latest would be 2.7.4, but at first I propose going to 2.5.5. All tests pass, but if there are issues I'd be happy to help; I'm author of Jackson project. was: Support rolling sink writer in avro key value format. preferably without additional classpath dependencies preferable in same format as M/R jobs for backward compatibility > Upgrade Jackson version > --- > > Key: FLINK-3855 > URL: https://issues.apache.org/jira/browse/FLINK-3855 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Priority: Minor > > Jackson version in use (2.4.2) is rather old (and not even the latest patch > from minor version), so it'd be make sense to upgrade to bit newer. Latest > would be 2.7.4, but at first I propose going to 2.5.5. > All tests pass, but if there are issues I'd be happy to help; I'm author of > Jackson project. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3855) Upgrade Jackson version
[ https://issues.apache.org/jira/browse/FLINK-3855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tatu Saloranta updated FLINK-3855: -- Priority: Minor (was: Major) > Upgrade Jackson version > --- > > Key: FLINK-3855 > URL: https://issues.apache.org/jira/browse/FLINK-3855 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Priority: Minor > > Support rolling sink writer in avro key value format. > preferably without additional classpath dependencies > preferable in same format as M/R jobs for backward compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3854) Support Avro key-value rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Berman updated FLINK-3854: --- Summary: Support Avro key-value rolling sink writer (was: Support Avro key-value rolling sink) > Support Avro key-value rolling sink writer > -- > > Key: FLINK-3854 > URL: https://issues.apache.org/jira/browse/FLINK-3854 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Igor Berman > > Support rolling sink writer in avro key value format. > preferably without additional classpath dependencies > preferable in same format as M/R jobs for backward compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3854) Support Avro key-value rolling sink
Igor Berman created FLINK-3854: -- Summary: Support Avro key-value rolling sink Key: FLINK-3854 URL: https://issues.apache.org/jira/browse/FLINK-3854 Project: Flink Issue Type: Improvement Components: Streaming Connectors Affects Versions: 1.0.3 Reporter: Igor Berman Support rolling sink writer in avro key value format. preferably without additional classpath dependencies preferable in same format as M/R jobs for backward compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...
Github user gna-phetsarath commented on the pull request: https://github.com/apache/flink/pull/1920#issuecomment-215887300 Many contributions to Flink is a great problem to heave. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3821] Reduce Guava usage in flink-java
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1938#issuecomment-215830882 +1 with two comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3821) Reduce Guava usage in flink-java
[ https://issues.apache.org/jira/browse/FLINK-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526 ] ASF GitHub Bot commented on FLINK-3821: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1938#issuecomment-215830882 +1 with two comments. > Reduce Guava usage in flink-java > > > Key: FLINK-3821 > URL: https://issues.apache.org/jira/browse/FLINK-3821 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 1.0.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264410#comment-15264410 ] ASF GitHub Bot commented on FLINK-2259: --- Github user rawkintrevo commented on the pull request: https://github.com/apache/flink/pull/1898#issuecomment-215826478 Hey @tillrohrmann, thanks for the review. I've addressed your commments in the code. I am in the middle of documentation had to commit. Should finish up docs this afternoon. > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Trevor Grant >Priority: Minor > Labels: ML > > When there is an abundance of data available, a good way to train models is > to split the available data into 3 parts: Train, Validation and Test. > We use the Train data to train the model, the Validation part is used to > estimate the test error and select hyperparameters, and the Test is used to > evaluate the performance of the model, and assess its generalization [1] > This is a common approach when training Artificial Neural Networks, and a > good strategy to choose in data-rich environments. Therefore we should have > some support of this data-analysis process in our Estimators. > [1] Friedman, Jerome, Trevor Hastie, and Robert Tibshirani. The elements of > statistical learning. Vol. 1. Springer, Berlin: Springer series in > statistics, 2001. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user rawkintrevo commented on the pull request: https://github.com/apache/flink/pull/1898#issuecomment-215826478 Hey @tillrohrmann, thanks for the review. I've addressed your commments in the code. I am in the middle of documentation had to commit. Should finish up docs this afternoon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3821] Reduce Guava usage in flink-java
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1938#discussion_r61614176 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java --- @@ -104,7 +104,10 @@ public void open(Configuration parameters) throws Exception { @Override public List> initializeBroadcastVariable(Iterable> data) { // sort the list by task id to calculate the correct offset - List> sortedData = Lists.newArrayList(data); + List> sortedData = new ArrayList(); --- End diff -- Add the diamond operator to ArrayList? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3821) Reduce Guava usage in flink-java
[ https://issues.apache.org/jira/browse/FLINK-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264363#comment-15264363 ] ASF GitHub Bot commented on FLINK-3821: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1938#discussion_r61614176 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java --- @@ -104,7 +104,10 @@ public void open(Configuration parameters) throws Exception { @Override public List> initializeBroadcastVariable(Iterable> data) { // sort the list by task id to calculate the correct offset - List> sortedData = Lists.newArrayList(data); + List> sortedData = new ArrayList(); --- End diff -- Add the diamond operator to ArrayList? > Reduce Guava usage in flink-java > > > Key: FLINK-3821 > URL: https://issues.apache.org/jira/browse/FLINK-3821 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 1.0.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3821] Reduce Guava usage in flink-java
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1938#discussion_r61613247 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -133,13 +132,17 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws protected static boolean[] toBooleanMask(int[] sourceFieldIndices) { Preconditions.checkNotNull(sourceFieldIndices); + int max = 0; for (int i : sourceFieldIndices) { if (i < 0) { throw new IllegalArgumentException("Field indices must not be smaller than zero."); } + if (i > max) { --- End diff -- Use the intrinsic `max = Math.max(max, i);`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3821) Reduce Guava usage in flink-java
[ https://issues.apache.org/jira/browse/FLINK-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264353#comment-15264353 ] ASF GitHub Bot commented on FLINK-3821: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1938#discussion_r61613247 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -133,13 +132,17 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws protected static boolean[] toBooleanMask(int[] sourceFieldIndices) { Preconditions.checkNotNull(sourceFieldIndices); + int max = 0; for (int i : sourceFieldIndices) { if (i < 0) { throw new IllegalArgumentException("Field indices must not be smaller than zero."); } + if (i > max) { --- End diff -- Use the intrinsic `max = Math.max(max, i);`? > Reduce Guava usage in flink-java > > > Key: FLINK-3821 > URL: https://issues.apache.org/jira/browse/FLINK-3821 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 1.0.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3753) KillerWatchDog should not use kill on toKill thread
[ https://issues.apache.org/jira/browse/FLINK-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3753: -- Description: {code} // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); } {code} stop() is deprecated. See: https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads was: {code} // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); } {code} stop() is deprecated. See: https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads > KillerWatchDog should not use kill on toKill thread > --- > > Key: FLINK-3753 > URL: https://issues.apache.org/jira/browse/FLINK-3753 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > // this is harsh, but this watchdog is a last resort > if (toKill.isAlive()) { > toKill.stop(); > } > {code} > stop() is deprecated. > See: > https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3
[ https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3801: -- Description: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 was: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 > Upgrade Joda-Time library to 2.9.3 > -- > > Key: FLINK-3801 > URL: https://issues.apache.org/jira/browse/FLINK-3801 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently yoda-time 2.5 is used which was very old. > We should upgrade to 2.9.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3239) Support for Kerberos enabled Kafka 0.9.0.0
[ https://issues.apache.org/jira/browse/FLINK-3239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264256#comment-15264256 ] Vijay Srinivasaraghavan commented on FLINK-3239: My bad.. I overlooked the krb file to interpret as keytab. I still believe the krb configuration is likely referred from its default install path (for e.g., /etc/krb5.conf) and we don't need to pass it. I agree that keytab is required in many places and a common approach is (if we plan to accept the keytab from user) to copy the keytab in a safe/secure location (spark handles this by copying to corresponding job directory in HDFS) from where various components could make use of it. > Support for Kerberos enabled Kafka 0.9.0.0 > -- > > Key: FLINK-3239 > URL: https://issues.apache.org/jira/browse/FLINK-3239 > Project: Flink > Issue Type: New Feature >Reporter: Niels Basjes >Assignee: Stefano Baghino > Attachments: flink3239-prototype.patch > > > In Kafka 0.9.0.0 support for Kerberos has been created ( KAFKA-1686 ). > Request: Allow Flink to forward/manage the Kerberos tickets for Kafka > correctly so that we can use Kafka in a secured environment. > I expect the needed changes to be similar to FLINK-2977 which implements the > same support for HBase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3853) Reduce object creation in Gelly utility mappers
[ https://issues.apache.org/jira/browse/FLINK-3853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264217#comment-15264217 ] ASF GitHub Bot commented on FLINK-3853: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1951 [FLINK-3853] [gelly] Reduce object creation in Gelly utility mappers Gelly contains a set of MapFunction between Vertex and Tuple2 and between Edge and Tuple3. A Vertex is a Tuple2 and an Edge is a Tuple3, and conversion in the opposite direction can be performed with a single object per MapFunction. This only applies to the Gelly Java API. Scala tuples are not related to Vertex or Edge. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3853_reduce_object_creation_in_gelly_utility_mappers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1951.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1951 commit 854a093c8256a6a3665eba4ee5820ca4c9e72b12 Author: Greg Hogan Date: 2016-04-29T13:42:08Z [FLINK-3853] [gelly] Reduce object creation in Gelly utility mappers Gelly contains a set of MapFunction between Vertex and Tuple2 and between Edge and Tuple3. A Vertex is a Tuple2 and an Edge is a Tuple3, and conversion in the opposite direction can be performed with a single object per MapFunction. This only applies to the Gelly Java API. Scala tuples are not related to Vertex or Edge. > Reduce object creation in Gelly utility mappers > --- > > Key: FLINK-3853 > URL: https://issues.apache.org/jira/browse/FLINK-3853 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Gelly contains a set of {{MapFunction}} between {{Vertex}} and {{Tuple2}} and > between {{Edge}} and {{Tuple3}}. A {{Vertex}} is a {{Tuple2}} and an {{Edge}} > is a {{Tuple3}}, and conversion in the opposite direction can be performed > with a single object per {{MapFunction}}. > This only applies to the Gelly Java API. Scala tuples are not related to > {{Vertex}} or {{Edge}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3853] [gelly] Reduce object creation in...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1951 [FLINK-3853] [gelly] Reduce object creation in Gelly utility mappers Gelly contains a set of MapFunction between Vertex and Tuple2 and between Edge and Tuple3. A Vertex is a Tuple2 and an Edge is a Tuple3, and conversion in the opposite direction can be performed with a single object per MapFunction. This only applies to the Gelly Java API. Scala tuples are not related to Vertex or Edge. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3853_reduce_object_creation_in_gelly_utility_mappers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1951.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1951 commit 854a093c8256a6a3665eba4ee5820ca4c9e72b12 Author: Greg Hogan Date: 2016-04-29T13:42:08Z [FLINK-3853] [gelly] Reduce object creation in Gelly utility mappers Gelly contains a set of MapFunction between Vertex and Tuple2 and between Edge and Tuple3. A Vertex is a Tuple2 and an Edge is a Tuple3, and conversion in the opposite direction can be performed with a single object per MapFunction. This only applies to the Gelly Java API. Scala tuples are not related to Vertex or Edge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3404) Extend Kafka consumers with interface StoppableFunction
[ https://issues.apache.org/jira/browse/FLINK-3404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264146#comment-15264146 ] Matthias J. Sax commented on FLINK-3404: I think so. [~rmetzger] can you comment on this, as you are most familiar with KafkaSource. > Extend Kafka consumers with interface StoppableFunction > --- > > Key: FLINK-3404 > URL: https://issues.apache.org/jira/browse/FLINK-3404 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Matthias J. Sax > > Kafka consumers are not stoppable right now. To make them stoppable, they > must implement {{StoppableFunction}}. Implementing method {{stop()}} must > ensure, that the consumer stops pulling new messages from Kafka and issues a > final checkpoint with the last offset. Afterwards, {{run()}} must return. > When implementing this, keep in mind, that the gathered checkpoint might > later be used as a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask
[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264132#comment-15264132 ] Konstantin Knauf commented on FLINK-3669: - Hmpf, of course... > WindowOperator registers a lot of timers at StreamTask > -- > > Key: FLINK-3669 > URL: https://issues.apache.org/jira/browse/FLINK-3669 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.1 >Reporter: Aljoscha Krettek >Assignee: Konstantin Knauf >Priority: Blocker > > Right now, the WindowOperator registers a timer at the StreamTask for every > processing-time timer that a Trigger registers. We should combine several > registered trigger timers to only register one low-level timer (timer > coalescing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264127#comment-15264127 ] ASF GitHub Bot commented on FLINK-3771: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215738750 I pushed the most recent changes. In order of things I care more about to things I care less about: The indentation is as intended. I find method chaining on a single line quite difficult to follow. I find that algorithms are much easier to read if separate operators are easily identifiable. Cramming `implements`, `extends`, and `throws` onto a single line is also difficult to parse, particularly for complicated nested parameters which may themselves extend interfaces. There are empty lines throughout the code. The remaining two look fine to me. > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215738750 I pushed the most recent changes. In order of things I care more about to things I care less about: The indentation is as intended. I find method chaining on a single line quite difficult to follow. I find that algorithms are much easier to read if separate operators are easily identifiable. Cramming `implements`, `extends`, and `throws` onto a single line is also difficult to parse, particularly for complicated nested parameters which may themselves extend interfaces. There are empty lines throughout the code. The remaining two look fine to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264098#comment-15264098 ] ASF GitHub Bot commented on FLINK-3771: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61583101 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { + output.setValue(offset + value.getValue()); + --- End diff -- Fixed in next push. > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61583101 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { + output.setValue(offset + value.getValue()); + --- End diff -- Fixed in next push. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2259) Support training Estimators using a (train, validation, test) split of the available data
[ https://issues.apache.org/jira/browse/FLINK-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264091#comment-15264091 ] ASF GitHub Bot commented on FLINK-2259: --- Github user rawkintrevo commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r61582494 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) --- End diff -- I think boostrapping would be a cool feature- but would require a different approach than the joins on the leftSplit/rightSplit. If you over sample the leftSplit, there's not going to be anything left to put in the right split (the whole points was to keep the training and testing cases seperate). I'm going to to add a boostrap method that will allow for oversampling in the testing and training cases. Re: the next comment, I will test is separately. > Support training Estimators using a (train, validation, test) split of the > available data > - > > Key: FLINK-2259 > URL: https://issues.apache.org/jira/browse/FLINK-2259 > Project: F
[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters
Github user rawkintrevo commented on a diff in the pull request: https://github.com/apache/flink/pull/1898#discussion_r61582494 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo} +import org.apache.flink.api.java.Utils +import org.apache.flink.api.scala. DataSet +import org.apache.flink.api.scala.utils._ + +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import _root_.scala.reflect.ClassTag + +object Splitter { + + case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T]) + + case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T], + testing: DataSet[T], + holdout: DataSet[T]) + // + // randomSplit + // + /** + * Split a DataSet by the probability fraction of each element. + * + * @param input DataSet to be split + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, â) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. This fraction refers to the first element in the + *resulting array. + * @param precise Sampling by default is random and can result in slightly lop-sided + *sample sets. When precise is true, equal sample set size are forced, + *however this is somewhat less efficient. + * @param seedRandom number generator seed. + * @return An array of two datasets + */ + + def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T], + fraction: Double, + precise: Boolean = false, + seed: Long = Utils.RNG.nextLong()) + : Array[DataSet[T]] = { +import org.apache.flink.api.scala._ + +val indexedInput: DataSet[(Long, T)] = input.zipWithIndex + +val leftSplit: DataSet[(Long, T)] = precise match { + case false => indexedInput.sample(false, fraction, seed) --- End diff -- I think boostrapping would be a cool feature- but would require a different approach than the joins on the leftSplit/rightSplit. If you over sample the leftSplit, there's not going to be anything left to put in the right split (the whole points was to keep the training and testing cases seperate). I'm going to to add a boostrap method that will allow for oversampling in the testing and training cases. Re: the next comment, I will test is separately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264086#comment-15264086 ] ASF GitHub Bot commented on FLINK-3771: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61582214 --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala --- @@ -407,6 +407,36 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Relabels vertices and edges using the given MapFunction. + * + * @param translator implements conversion from K to NEW + * @return relabeled graph + */ + def translateGraphLabels[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]): Graph[NEW,VV,EV] = { --- End diff -- Fixed in next push. > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61582214 --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala --- @@ -407,6 +407,36 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Relabels vertices and edges using the given MapFunction. + * + * @param translator implements conversion from K to NEW + * @return relabeled graph + */ + def translateGraphLabels[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]): Graph[NEW,VV,EV] = { --- End diff -- Fixed in next push. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3090) Create a Parcel Distribution for Cloudera Manager
[ https://issues.apache.org/jira/browse/FLINK-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264085#comment-15264085 ] Márton Balassi commented on FLINK-3090: --- Thanks, [~cos]. Fortunately [~mxm] and [~rmetzger] has already done the grunt of the work via the corresponding Bigtop PR [1], I am giving it a try to continue from there. [1] https://github.com/apache/bigtop/pull/101 > Create a Parcel Distribution for Cloudera Manager > - > > Key: FLINK-3090 > URL: https://issues.apache.org/jira/browse/FLINK-3090 > Project: Flink > Issue Type: Improvement > Components: release >Affects Versions: 0.10.1 >Reporter: Johannes >Assignee: Márton Balassi >Priority: Minor > > For ease of deployment it would be nice to provide a parcel distribution of > Flink which can be easily managed using Clouder Manager. > This would set up all necessary dependencies on all machine and starts Flink > jobs using yarn. > A good description of how to get started can be found in the [Cloudera > Manager Extensibility Tools and Documentation | > https://github.com/cloudera/cm_ext] > What needs to be done > * Create a service description > * Create a parcel containing the release files > * Create scripts that can write the appropriate configuration files, taking > values from the cloudera manager web config > For reference [a collection on how this is configured for other > services|https://github.com/cloudera/cm_csds], such as Spark. > Some [starter code | https://github.com/jkirsch/cmflink] can be found here. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3090) Create a Parcel Distribution for Cloudera Manager
[ https://issues.apache.org/jira/browse/FLINK-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi reassigned FLINK-3090: - Assignee: Márton Balassi > Create a Parcel Distribution for Cloudera Manager > - > > Key: FLINK-3090 > URL: https://issues.apache.org/jira/browse/FLINK-3090 > Project: Flink > Issue Type: Improvement > Components: release >Affects Versions: 0.10.1 >Reporter: Johannes >Assignee: Márton Balassi >Priority: Minor > > For ease of deployment it would be nice to provide a parcel distribution of > Flink which can be easily managed using Clouder Manager. > This would set up all necessary dependencies on all machine and starts Flink > jobs using yarn. > A good description of how to get started can be found in the [Cloudera > Manager Extensibility Tools and Documentation | > https://github.com/cloudera/cm_ext] > What needs to be done > * Create a service description > * Create a parcel containing the release files > * Create scripts that can write the appropriate configuration files, taking > values from the cloudera manager web config > For reference [a collection on how this is configured for other > services|https://github.com/cloudera/cm_csds], such as Spark. > Some [starter code | https://github.com/jkirsch/cmflink] can be found here. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...
Github user gna-phetsarath commented on the pull request: https://github.com/apache/flink/pull/1920#issuecomment-215716418 @greghogan - What are next steps? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264062#comment-15264062 ] ASF GitHub Bot commented on FLINK-3771: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61579289 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using
[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1920#issuecomment-215717429 Sorry for the delay. The Flink community is receiving many contributions, that causes sometimes long review times. I think we can merge the change once @greghogan has reviewed your responses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61579289 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel
[jira] [Updated] (FLINK-3853) Reduce object creation in Gelly utility mappers
[ https://issues.apache.org/jira/browse/FLINK-3853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3853: -- Description: Gelly contains a set of {{MapFunction}} between {{Vertex}} and {{Tuple2}} and between {{Edge}} and {{Tuple3}}. A {{Vertex}} is a {{Tuple2}} and an {{Edge}} is a {{Tuple3}}, and conversion in the opposite direction can be performed with a single object per {{MapFunction}}. This only applies to the Gelly Java API. Scala tuples are not related to {{Vertex}} or {{Edge}}. was:Gelly contains a set of {{MapFunction}} between {{Vertex}} and {{Tuple2}} and between {{Edge}} and {{Tuple3}}. A {{Vertex}} is a {{Tuple2}} and an {{Edge}} is a {{Tuple3}}, and conversion in the opposite direction can be performed with a single object per {{MapFunction}}. > Reduce object creation in Gelly utility mappers > --- > > Key: FLINK-3853 > URL: https://issues.apache.org/jira/browse/FLINK-3853 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Gelly contains a set of {{MapFunction}} between {{Vertex}} and {{Tuple2}} and > between {{Edge}} and {{Tuple3}}. A {{Vertex}} is a {{Tuple2}} and an {{Edge}} > is a {{Tuple3}}, and conversion in the opposite direction can be performed > with a single object per {{MapFunction}}. > This only applies to the Gelly Java API. Scala tuples are not related to > {{Vertex}} or {{Edge}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3853) Reduce object creation in Gelly utility mappers
Greg Hogan created FLINK-3853: - Summary: Reduce object creation in Gelly utility mappers Key: FLINK-3853 URL: https://issues.apache.org/jira/browse/FLINK-3853 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Gelly contains a set of {{MapFunction}} between {{Vertex}} and {{Tuple2}} and between {{Edge}} and {{Tuple3}}. A {{Vertex}} is a {{Tuple2}} and an {{Edge}} is a {{Tuple3}}, and conversion in the opposite direction can be performed with a single object per {{MapFunction}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3772) Graph algorithms for vertex and edge degree
[ https://issues.apache.org/jira/browse/FLINK-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263993#comment-15263993 ] Greg Hogan commented on FLINK-3772: --- There is am impedance mismatch between the algorithms and `Graph` methods. The algorithms return datasets of `Vertex` and `Edge` whereas the `Graph` methods return `Tuple2` and `Tuple3`. Also, the algorithms use mutable value types whereas the `Graph` methods use immutable boxed types. It's trivial to convert the existing `Graph` methods using this PR, the four utility mapping functions, and a translator. I think adding to the current API is another discussion. > Graph algorithms for vertex and edge degree > --- > > Key: FLINK-3772 > URL: https://issues.apache.org/jira/browse/FLINK-3772 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Many graph algorithms require vertices or edges to be marked with the degree. > This ticket provides algorithms for annotating > * vertex degree for undirected graphs > * vertex out-, in-, and out- and in-degree for directed graphs > * edge source, target, and source and target degree for undirected graphs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2926) Add a Strongly Connected Components Library Method
[ https://issues.apache.org/jira/browse/FLINK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263992#comment-15263992 ] Martin Liesenberg commented on FLINK-2926: -- Cool, I'll update the design doc then and start coding. Vasia Kalavri (JIRA) schrieb am Fr., 29. Apr. 2016 um > Add a Strongly Connected Components Library Method > -- > > Key: FLINK-2926 > URL: https://issues.apache.org/jira/browse/FLINK-2926 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Martin Liesenberg >Priority: Minor > Labels: requires-design-doc > > This algorithm operates in four main steps: > 1). Form the transposed graph (each vertex sends its id to its out neighbors > which form a transposedNeighbors set) > 2). Trimming: every vertex which has only incoming or outgoing edges sets > colorID to its own value and becomes inactive. > 3). Forward traversal: >Start phase: propagate id to out neighbors >Rest phase: update the colorID with the minimum value seen > until convergence > 4). Backward traversal: > Start: if the vertex id is equal to its color id > propagate the value to transposedNeighbors > Rest: each vertex that receives a message equal to its > colorId will propagate its colorId to the transposed graph and becomes > inactive. > More info in section 3.1 of this paper: > http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf > or in section 6 of this paper: http://www.vldb.org/pvldb/vol7/p1821-yan.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2926) Add a Strongly Connected Components Library Method
[ https://issues.apache.org/jira/browse/FLINK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-2926: - Assignee: Martin Liesenberg > Add a Strongly Connected Components Library Method > -- > > Key: FLINK-2926 > URL: https://issues.apache.org/jira/browse/FLINK-2926 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Martin Liesenberg >Priority: Minor > Labels: requires-design-doc > > This algorithm operates in four main steps: > 1). Form the transposed graph (each vertex sends its id to its out neighbors > which form a transposedNeighbors set) > 2). Trimming: every vertex which has only incoming or outgoing edges sets > colorID to its own value and becomes inactive. > 3). Forward traversal: >Start phase: propagate id to out neighbors >Rest phase: update the colorID with the minimum value seen > until convergence > 4). Backward traversal: > Start: if the vertex id is equal to its color id > propagate the value to transposedNeighbors > Rest: each vertex that receives a message equal to its > colorId will propagate its colorId to the transposed graph and becomes > inactive. > More info in section 3.1 of this paper: > http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf > or in section 6 of this paper: http://www.vldb.org/pvldb/vol7/p1821-yan.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2926) Add a Strongly Connected Components Library Method
[ https://issues.apache.org/jira/browse/FLINK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263983#comment-15263983 ] Vasia Kalavri commented on FLINK-2926: -- Hey [~mliesenberg], thanks for pinging! I'll assign the issue to you then ;) > Add a Strongly Connected Components Library Method > -- > > Key: FLINK-2926 > URL: https://issues.apache.org/jira/browse/FLINK-2926 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Priority: Minor > Labels: requires-design-doc > > This algorithm operates in four main steps: > 1). Form the transposed graph (each vertex sends its id to its out neighbors > which form a transposedNeighbors set) > 2). Trimming: every vertex which has only incoming or outgoing edges sets > colorID to its own value and becomes inactive. > 3). Forward traversal: >Start phase: propagate id to out neighbors >Rest phase: update the colorID with the minimum value seen > until convergence > 4). Backward traversal: > Start: if the vertex id is equal to its color id > propagate the value to transposedNeighbors > Rest: each vertex that receives a message equal to its > colorId will propagate its colorId to the transposed graph and becomes > inactive. > More info in section 3.1 of this paper: > http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf > or in section 6 of this paper: http://www.vldb.org/pvldb/vol7/p1821-yan.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3404) Extend Kafka consumers with interface StoppableFunction
[ https://issues.apache.org/jira/browse/FLINK-3404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263977#comment-15263977 ] Martin Liesenberg commented on FLINK-3404: -- I have been looking at this task recently and just want to make sure I understand it correctly: there are a {{close}} and a {{cancel}} method already. the difference of the {{stop}} method would be the issuing of the checkpoint, right? > Extend Kafka consumers with interface StoppableFunction > --- > > Key: FLINK-3404 > URL: https://issues.apache.org/jira/browse/FLINK-3404 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Matthias J. Sax > > Kafka consumers are not stoppable right now. To make them stoppable, they > must implement {{StoppableFunction}}. Implementing method {{stop()}} must > ensure, that the consumer stops pulling new messages from Kafka and issues a > final checkpoint with the last offset. Afterwards, {{run()}} must return. > When implementing this, keep in mind, that the gathered checkpoint might > later be used as a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask
[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263974#comment-15263974 ] Aljoscha Krettek commented on FLINK-3669: - Ah, in the custom trigger the {{oldTimestamp}} field is problematic. One {{Trigger}} instance is used for all the different keys, this means that the field will change in between {{onElement()}} calls for the same key if {{onElement()}} is called in the meantime for a different key. For the timestamp a {{ValueState}} could be used, the states are kept separately for each key. For an example, {{CountTrigger}} is good, it's also the only {{Trigger}} we currently have in Flink that uses trigger state. > WindowOperator registers a lot of timers at StreamTask > -- > > Key: FLINK-3669 > URL: https://issues.apache.org/jira/browse/FLINK-3669 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.1 >Reporter: Aljoscha Krettek >Assignee: Konstantin Knauf >Priority: Blocker > > Right now, the WindowOperator registers a timer at the StreamTask for every > processing-time timer that a Trigger registers. We should combine several > registered trigger timers to only register one low-level timer (timer > coalescing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Timer coalescing across keys and cleanup of un...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1944#discussion_r61567670 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -542,14 +561,24 @@ public void registerEventTimeTimer(long time) { // immediately schedule a trigger, so that we don't wait for the next // watermark update to fire the watermark trigger getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this); + //No need to put it in processingTimeTimerFutures as this timer is never removed } } @Override public void deleteProcessingTimeTimer(long time) { Timer timer = new Timer<>(time, key, window); - if (processingTimeTimers.remove(timer)) { - processingTimeTimersQueue.remove(timer); + + processingTimeTimers.remove(timer); + processingTimeTimersQueue.remove(timer); + + //If there are no timers left for this timestamp, remove it from queue and cancel TriggerTask + if (processingTimeTimerTimestamps.remove(time,1) == 1) { + ScheduledFuture triggerTaskFuture = processingTimeTimerFutures.get(timer); --- End diff -- This should be replaced by `processingTimeTimerFutures.remove(timer.time)`. The key in `processingTimeTimerFutures` is a `long` timestamp. This way, we also don't need to call remove inside the if and only have one lookup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3444) env.fromElements relies on the first input element for determining the DataSet/DataStream type
[ https://issues.apache.org/jira/browse/FLINK-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263952#comment-15263952 ] Till Rohrmann commented on FLINK-3444: -- The common nearest super type would be the intersection type {{A & B}} to be precise. But I see your point that it would not be possible to decide which of the interface shall be selected as the common super type if it weren't for the class variant of {{fromElements}}. > env.fromElements relies on the first input element for determining the > DataSet/DataStream type > -- > > Key: FLINK-3444 > URL: https://issues.apache.org/jira/browse/FLINK-3444 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 0.10.0, 1.0.0 >Reporter: Vasia Kalavri > Fix For: 1.1.0 > > > The {{fromElements}} method of the {{ExecutionEnvironment}} and > {{StreamExecutionEnvironment}} determines the DataSet/DataStream type by > extracting the type of the first input element. > This is problematic if the first element is a subtype of another element in > the collection. > For example, the following > {code} > DataStream input = env.fromElements(new Event(1, "a"), new SubEvent(2, > "b")); > {code} > succeeds, while the following > {code} > DataStream input = env.fromElements(new SubEvent(1, "a"), new Event(2, > "b")); > {code} > fails with "java.lang.IllegalArgumentException: The elements in the > collection are not all subclasses of SubEvent". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3847) Reorganize package structure of flink-table
[ https://issues.apache.org/jira/browse/FLINK-3847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263947#comment-15263947 ] ASF GitHub Bot commented on FLINK-3847: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/1950 [FLINK-3847] Restructure flink-table test packages. This PR refactors the tests of the flink-table module. The new package structure is as follows: ``` src/test/java/o/a/f/api/java/batch/sql /table /stream/sql /table /utils src/test/scala/o/a/f/api/scala/batch/sql /table /utils /expression /stream/sql /table /utils /typeutils ``` Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableTests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1950.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1950 commit 110d335780d15787f535950abdf299a41f06b405 Author: Fabian Hueske Date: 2016-04-28T20:51:49Z [FLINK-3847] Restructure flink-table test packages. > Reorganize package structure of flink-table > --- > > Key: FLINK-3847 > URL: https://issues.apache.org/jira/browse/FLINK-3847 > Project: Flink > Issue Type: Task > Components: Table API >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > The flink-table module has tests for the matrix of > - Java and Scala > - Batch and Streaming > - Table API and SQL > Right now, there is no consistent package structure for the tests. I propose > to structure the test packages as follows: > {code} > test/java/o/a/f/api/table/batch/table > test/java/o/a/f/api/table/batch/sql > test/java/o/a/f/api/table/stream/table > test/java/o/a/f/api/table/stream/sql > test/scala/o/a/f/api/table/batch/table > test/scala/o/a/f/api/table/batch/sql > test/scala/o/a/f/api/table/stream/table > test/scala/o/a/f/api/table/stream/sql > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3847] Restructure flink-table test pack...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/1950 [FLINK-3847] Restructure flink-table test packages. This PR refactors the tests of the flink-table module. The new package structure is as follows: ``` src/test/java/o/a/f/api/java/batch/sql /table /stream/sql /table /utils src/test/scala/o/a/f/api/scala/batch/sql /table /utils /expression /stream/sql /table /utils /typeutils ``` Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableTests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1950.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1950 commit 110d335780d15787f535950abdf299a41f06b405 Author: Fabian Hueske Date: 2016-04-28T20:51:49Z [FLINK-3847] Restructure flink-table test packages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3844) Checkpoint failures should not always lead to job failures
[ https://issues.apache.org/jira/browse/FLINK-3844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263946#comment-15263946 ] Aljoscha Krettek commented on FLINK-3844: - +1, we could have something similar to {{RestartStrategy}} but for checkpoints that determines when failing checkpoints should crash a job. > Checkpoint failures should not always lead to job failures > -- > > Key: FLINK-3844 > URL: https://issues.apache.org/jira/browse/FLINK-3844 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Gyula Fora > > Currently when a checkpoint fails the job crashes immediately. This is not > the desired behaviour in many cases. It would probably be better to log the > failed checkpoint attempt and only fail the job after so many subsequent > failed attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3841) RocksDB statebackend creates empty dbs for stateless operators
[ https://issues.apache.org/jira/browse/FLINK-3841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263942#comment-15263942 ] Aljoscha Krettek commented on FLINK-3841: - I would do it next week... ;-) > RocksDB statebackend creates empty dbs for stateless operators > -- > > Key: FLINK-3841 > URL: https://issues.apache.org/jira/browse/FLINK-3841 > Project: Flink > Issue Type: Bug > Components: state backends >Affects Versions: 1.1.0 >Reporter: Gyula Fora >Priority: Minor > > Even though they are not checkpointed there is always an open RocksDB > database for all operators if the Rocks backend is set. I wonder if it would > make sense to lazy initialize the dbs instead of doing it in the > initializeForJob method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263866#comment-15263866 ] Fabian Hueske commented on FLINK-2168: -- Hi [~ram_krish], this issue depends on FLINK-3848 and FLINK-3849. I'll open PR for those in the next days. Then you should be ready to go :-) Cheers, Fabian > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Wilmer DAZA >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-3848: Assignee: Fabian Hueske > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-3849: Assignee: Fabian Hueske > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263841#comment-15263841 ] ASF GitHub Bot commented on FLINK-1502: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1947#discussion_r61556020 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -152,6 +154,9 @@ class TaskManager( /** Registry of metrics periodically transmitted to the JobManager */ private val metricRegistry = TaskManager.createMetricsRegistry() --- End diff -- when we can display the new metrics in the dashboard. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1502] Basic Metric System
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1947#discussion_r61556020 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -152,6 +154,9 @@ class TaskManager( /** Registry of metrics periodically transmitted to the JobManager */ private val metricRegistry = TaskManager.createMetricsRegistry() --- End diff -- when we can display the new metrics in the dashboard. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263834#comment-15263834 ] ASF GitHub Bot commented on FLINK-1502: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1947#discussion_r61555221 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -924,6 +936,15 @@ class TaskManager( else { libraryCacheManager = Some(new FallbackLibraryCacheManager) } + +metricsRegistry = new FlinkMetricRegistry(GlobalConfiguration.getConfiguration); --- End diff -- I think it's better to use `config.configuration`. At some point in time we might get around to removing the `GlobalConfiguration`. > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1502] Basic Metric System
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1947#discussion_r61555221 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -924,6 +936,15 @@ class TaskManager( else { libraryCacheManager = Some(new FallbackLibraryCacheManager) } + +metricsRegistry = new FlinkMetricRegistry(GlobalConfiguration.getConfiguration); --- End diff -- I think it's better to use `config.configuration`. At some point in time we might get around to removing the `GlobalConfiguration`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1502] Basic Metric System
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1947#discussion_r61555065 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -152,6 +154,9 @@ class TaskManager( /** Registry of metrics periodically transmitted to the JobManager */ private val metricRegistry = TaskManager.createMetricsRegistry() --- End diff -- What are the plans for removing/subsuming this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263831#comment-15263831 ] ASF GitHub Bot commented on FLINK-1502: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1947#discussion_r61555065 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -152,6 +154,9 @@ class TaskManager( /** Registry of metrics periodically transmitted to the JobManager */ private val metricRegistry = TaskManager.createMetricsRegistry() --- End diff -- What are the plans for removing/subsuming this? > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton
[ https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263819#comment-15263819 ] Robert Metzger commented on FLINK-3852: --- Yes, I think its a good idea to have a file for dataset and data stream API programs. > Use a StreamExecutionEnvironment in the quickstart job skeleton > --- > > Key: FLINK-3852 > URL: https://issues.apache.org/jira/browse/FLINK-3852 > Project: Flink > Issue Type: Task > Components: Quickstarts >Reporter: Robert Metzger > Labels: starter > > The Job skeleton created by the maven archetype "quickstart" is still setting > up an ExecutionEnvironment, not a StreamExecutionEnvironment. > These days, most users are using Flink for streaming, so we should reflect > that in the quickstart as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263812#comment-15263812 ] Ufuk Celebi commented on FLINK-3800: We had to revert this in 0708dd0 for release-1.0 after a discussion with Till. The problem is that JobGraphs are lost when the job reaches a final state, after which it will be removed from ZooKeeper. If they stay orphans though, this can lead to races, where the orphan and re-deployment after leadership compete for the same resources (as reported by a user). > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263806#comment-15263806 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552700 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { + + // Required configuration + private MapFunction translator; + + // Optional configuration + private int parallelism = PARALLELISM_UNKNOWN; + + /** +* Translate {@link Edge} values using the given {@link MapFunction}. +* +* @param translator implements conversion from {@code OLD} to {@code NEW} +*/ + public TranslateEdgeValues(MapFunction translator) { + Preconditions.checkNotNull(translator); + + this.translator = translator; + } + + /** +* Override the operator parallelism. +* +* @param parallelism operator parallelism +* @return this +*/ + public TranslateEdgeValues setParallelism(int parallelism) { + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + this.parallelism = parallelism; + --- End diff -- empty line > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi reopened FLINK-3800: > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263805#comment-15263805 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552674 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { + + // Required configuration + private MapFunction translator; + + // Optional configuration + private int parallelism = PARALLELISM_UNKNOWN; + + /** +* Translate {@link Edge} values using the given {@link MapFunction}. +* +* @param translator implements conversion from {@code OLD} to {@code NEW} +*/ + public TranslateEdgeValues(MapFunction translator) { + Preconditions.checkNotNull(translator); + --- End diff -- empty line can be removed > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552674 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { + + // Required configuration + private MapFunction translator; + + // Optional configuration + private int parallelism = PARALLELISM_UNKNOWN; + + /** +* Translate {@link Edge} values using the given {@link MapFunction}. +* +* @param translator implements conversion from {@code OLD} to {@code NEW} +*/ + public TranslateEdgeValues(MapFunction translator) { + Preconditions.checkNotNull(translator); + --- End diff -- empty line can be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263804#comment-15263804 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552592 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { --- End diff -- can be moved to the previous line > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552700 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { + + // Required configuration + private MapFunction translator; + + // Optional configuration + private int parallelism = PARALLELISM_UNKNOWN; + + /** +* Translate {@link Edge} values using the given {@link MapFunction}. +* +* @param translator implements conversion from {@code OLD} to {@code NEW} +*/ + public TranslateEdgeValues(MapFunction translator) { + Preconditions.checkNotNull(translator); + + this.translator = translator; + } + + /** +* Override the operator parallelism. +* +* @param parallelism operator parallelism +* @return this +*/ + public TranslateEdgeValues setParallelism(int parallelism) { + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + this.parallelism = parallelism; + --- End diff -- empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552592 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { --- End diff -- can be moved to the previous line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263795#comment-15263795 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552416 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263798#comment-15263798 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552498 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263796#comment-15263796 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552437 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552498 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel +
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552416 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel +
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552437 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel +
[jira] [Commented] (FLINK-2926) Add a Strongly Connected Components Library Method
[ https://issues.apache.org/jira/browse/FLINK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263788#comment-15263788 ] Martin Liesenberg commented on FLINK-2926: -- if there's still interest in having this algorithm as part of gelly's library, I'd start working on it again. > Add a Strongly Connected Components Library Method > -- > > Key: FLINK-2926 > URL: https://issues.apache.org/jira/browse/FLINK-2926 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Priority: Minor > Labels: requires-design-doc > > This algorithm operates in four main steps: > 1). Form the transposed graph (each vertex sends its id to its out neighbors > which form a transposedNeighbors set) > 2). Trimming: every vertex which has only incoming or outgoing edges sets > colorID to its own value and becomes inactive. > 3). Forward traversal: >Start phase: propagate id to out neighbors >Rest phase: update the colorID with the minimum value seen > until convergence > 4). Backward traversal: > Start: if the vertex id is equal to its color id > propagate the value to transposedNeighbors > Rest: each vertex that receives a message equal to its > colorId will propagate its colorId to the transposed graph and becomes > inactive. > More info in section 3.1 of this paper: > http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf > or in section 6 of this paper: http://www.vldb.org/pvldb/vol7/p1821-yan.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551910 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel +
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263787#comment-15263787 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551910 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263783#comment-15263783 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551647 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) --- End diff -- indention is off > Methods for translating Graphs > -- > > Key: FLINK-3
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263780#comment-15263780 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551561 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { --- End diff -- spaces > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551647 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) --- End diff -- indention is off --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551561 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { --- End diff -- spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263777#comment-15263777 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551440 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { --- End diff -- can you leave one space after commas? i.e. `` instead of ``? > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551440 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { --- End diff -- can you leave one space after commas? i.e. `` instead of ``? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263770#comment-15263770 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551238 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} to {@link IntValue}. + * + * Throws {@link RuntimeException} for integer overflow. + */ +public class LongValueToIntValue +implements MapFunction { + + private IntValue output = new IntValue(); + + @Override + public IntValue map(LongValue value) + throws Exception { --- End diff -- can be moved to the line above > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263772#comment-15263772 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551292 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { + output.setValue(offset + value.getValue()); + --- End diff -- empty line can be removed > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263769#comment-15263769 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551225 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} to {@link IntValue}. + * + * Throws {@link RuntimeException} for integer overflow. + */ +public class LongValueToIntValue +implements MapFunction { --- End diff -- can be moved to the line above > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263771#comment-15263771 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551265 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; + +/** + * Translate {@link LongValue} to {@link StringValue}. + */ +public class LongValueToStringValue +implements MapFunction { --- End diff -- same :) > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551292 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { + output.setValue(offset + value.getValue()); + --- End diff -- empty line can be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551265 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; + +/** + * Translate {@link LongValue} to {@link StringValue}. + */ +public class LongValueToStringValue +implements MapFunction { --- End diff -- same :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551238 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} to {@link IntValue}. + * + * Throws {@link RuntimeException} for integer overflow. + */ +public class LongValueToIntValue +implements MapFunction { + + private IntValue output = new IntValue(); + + @Override + public IntValue map(LongValue value) + throws Exception { --- End diff -- can be moved to the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551225 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} to {@link IntValue}. + * + * Throws {@link RuntimeException} for integer overflow. + */ +public class LongValueToIntValue +implements MapFunction { --- End diff -- can be moved to the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263766#comment-15263766 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551105 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { --- End diff -- can go in the line above > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263768#comment-15263768 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551172 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { --- End diff -- can be moved to the line above > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551172 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { --- End diff -- can be moved to the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton
[ https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263763#comment-15263763 ] Stefano Baghino commented on FLINK-3852: I agree, but would it make sense to have a quickstart for both the DataSet and DataStream APIs instead of replacing it completely? > Use a StreamExecutionEnvironment in the quickstart job skeleton > --- > > Key: FLINK-3852 > URL: https://issues.apache.org/jira/browse/FLINK-3852 > Project: Flink > Issue Type: Task > Components: Quickstarts >Reporter: Robert Metzger > Labels: starter > > The Job skeleton created by the maven archetype "quickstart" is still setting > up an ExecutionEnvironment, not a StreamExecutionEnvironment. > These days, most users are using Flink for streaming, so we should reflect > that in the quickstart as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551105 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { --- End diff -- can go in the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263758#comment-15263758 ] ASF GitHub Bot commented on FLINK-3771: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61550681 --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala --- @@ -407,6 +407,36 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Relabels vertices and edges using the given MapFunction. + * + * @param translator implements conversion from K to NEW + * @return relabeled graph + */ + def translateGraphLabels[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]): Graph[NEW,VV,EV] = { --- End diff -- In the the Gelly code and docs, we refer to the vertex and edge keys as "Ids", not labels. I think we should rename the methods and javadocs to be consistent. > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61550681 --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala --- @@ -407,6 +407,36 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Relabels vertices and edges using the given MapFunction. + * + * @param translator implements conversion from K to NEW + * @return relabeled graph + */ + def translateGraphLabels[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]): Graph[NEW,VV,EV] = { --- End diff -- In the the Gelly code and docs, we refer to the vertex and edge keys as "Ids", not labels. I think we should rename the methods and javadocs to be consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---