[jira] [Commented] (FLINK-1916) EOFException when running delta-iteration job
[ https://issues.apache.org/jira/browse/FLINK-1916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505338#comment-14505338 ] Stephan Ewen commented on FLINK-1916: - Confirmed, this is a bug in the {{CompactingHashTable}} class. [~knub] - do you have a minimal example that is able to reproduce this bug? Then I'll try and fix it. EOFException when running delta-iteration job - Key: FLINK-1916 URL: https://issues.apache.org/jira/browse/FLINK-1916 Project: Flink Issue Type: Bug Environment: 0.9-milestone-1 Exception on the cluster, local execution works Reporter: Stefan Bunk The delta-iteration program in [1] ends with an java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) For logs and the accompanying mailing list discussion see below. When running with slightly different memory configuration, as hinted on the mailing list, I sometimes also get this exception: 19.Apr. 13:39:29 INFO Task - IterationHead(WorksetIteration (Resolved-Redirects)) (10/10) switched to FAILED : java.lang.IndexOutOfBoundsException: Index: 161, Size: 161 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) [1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57 [2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4 [3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc [4] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1828) Impossible to output data to an HBase table
[ https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505406#comment-14505406 ] ASF GitHub Bot commented on FLINK-1828: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r28805740 --- Diff: flink-staging/flink-hbase/pom.xml --- @@ -112,6 +112,12 @@ under the License. /exclusion /exclusions /dependency + dependency --- End diff -- Is the HBase server dependency really required for any client that wants to write into HBase? This seems like a pretty bad design on the HBase side. Can you tell us what fails when you omit this dependency? Impossible to output data to an HBase table --- Key: FLINK-1828 URL: https://issues.apache.org/jira/browse/FLINK-1828 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9 Reporter: Flavio Pompermaier Labels: hadoop, hbase Fix For: 0.9 Right now it is not possible to use HBase TableOutputFormat as output format because Configurable.setConf is not called in the configure() method of the HadoopOutputFormatBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Fixed Configurable HadoopOutputFormat (FLINK-1...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r28805740 --- Diff: flink-staging/flink-hbase/pom.xml --- @@ -112,6 +112,12 @@ under the License. /exclusion /exclusions /dependency + dependency --- End diff -- Is the HBase server dependency really required for any client that wants to write into HBase? This seems like a pretty bad design on the HBase side. Can you tell us what fails when you omit this dependency? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1523) Vertex-centric iteration extensions
[ https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505565#comment-14505565 ] ASF GitHub Bot commented on FLINK-1523: --- Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28812911 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -228,48 +213,92 @@ public void setInput(DataSetVertexVertexKey, VertexValue inputData) { { return new VertexCentricIterationVertexKey, VertexValue, Message, EdgeValue(uf, mf, edgesWithValue, maximumNumberOfIterations); } - + + /** +* Configures this vertex-centric iteration with the provided parameters. +* +* @param parameters the configuration parameters +*/ + public void configure(IterationConfiguration parameters) { + this.configuration = parameters; + } + + /** +* @return the configuration parameters of this vertex-centric iteration +*/ + public IterationConfiguration getIterationConfiguration() { + return this.configuration; + } + // // Wrapping UDFs // - private static final class VertexUpdateUdfVertexKey extends ComparableVertexKey Serializable, - VertexValue extends Serializable, Message - extends RichCoGroupFunctionTuple2VertexKey, Message, VertexVertexKey, VertexValue, VertexVertexKey, VertexValue - implements ResultTypeQueryableVertexVertexKey, VertexValue + private static abstract class VertexUpdateUdfVertexKey extends ComparableVertexKey Serializable, + VV extends Serializable, VertexValue extends Serializable, Message --- End diff -- Perhaps I chose a name that is a bit misleading, here VV is replaced by the Tuple3VertexValue, InDegree, OutDegree when you have the degrees option set or simply VertexValue if the degrees are not set. I still need the VertexValue generic type. I can rename VertexValue to VV, EdgeValue to EV, etc and VV to something else? Something like VertexValueMessagingFunction. I can't think of a better name :) Tell me if my intention is clearer now and how to proceed! Vertex-centric iteration extensions --- Key: FLINK-1523 URL: https://issues.apache.org/jira/browse/FLINK-1523 Project: Flink Issue Type: Improvement Components: Gelly Reporter: Vasia Kalavri Assignee: Andra Lungu We would like to make the following extensions to the vertex-centric iterations of Gelly: - allow vertices to access their in/out degrees and the total number of vertices of the graph, inside the iteration. - allow choosing the neighborhood type (in/out/all) over which to run the vertex-centric iteration. Now, the model uses the updates of the in-neighbors to calculate state and send messages to out-neighbors. We could add a parameter with value in/out/all to the {{VertexUpdateFunction}} and {{MessagingFunction}}, that would indicate the type of neighborhood. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1867) TaskManagerFailureRecoveryITCase causes stalled travis builds
[ https://issues.apache.org/jira/browse/FLINK-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505596#comment-14505596 ] ASF GitHub Bot commented on FLINK-1867: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28814574 --- Diff: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java --- @@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() { Tuple2String, Object localAddress = new Tuple2String, Object(localhost, jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s); + jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20); --- End diff -- How long do the tests take now? With a pause of 20s and a threshold of 20, how long does the JobManager take to realize that the TaskManager is down? TaskManagerFailureRecoveryITCase causes stalled travis builds - Key: FLINK-1867 URL: https://issues.apache.org/jira/browse/FLINK-1867 Project: Flink Issue Type: Bug Components: TaskManager, Tests Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Aljoscha Krettek There are currently tests on travis failing: https://travis-ci.org/apache/flink/jobs/57943063 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28814574 --- Diff: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java --- @@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() { Tuple2String, Object localAddress = new Tuple2String, Object(localhost, jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s); + jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20); --- End diff -- How long do the tests take now? With a pause of 20s and a threshold of 20, how long does the JobManager take to realize that the TaskManager is down? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-1735) Add FeatureHasher to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Alexandrov reassigned FLINK-1735: --- Assignee: Alexander Alexandrov Add FeatureHasher to machine learning library - Key: FLINK-1735 URL: https://issues.apache.org/jira/browse/FLINK-1735 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Alexander Alexandrov Labels: ML Using the hashing trick [1,2] is a common way to vectorize arbitrary feature values. The hash of the feature value is used to calculate its index for a vector entry. In order to mitigate possible collisions, a second hashing function is used to calculate the sign for the update value which is added to the vector entry. This way, it is likely that collision will simply cancel out. A feature hasher would also be helpful for NLP problems where it could be used to vectorize bag of words or ngrams feature vectors. Resources: [1] [https://en.wikipedia.org/wiki/Feature_hashing] [2] [http://scikit-learn.org/stable/modules/feature_extraction.html#feature-extraction] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28811435 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.IncrementalSSSPExample; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class IncrementalSSSPITCase extends MultipleProgramsTestBase { + + private String verticesPath; + + private String edgesPath; + + private String edgesInSSSPPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public IncrementalSSSPITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + File verticesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8); + + File edgesInSSSPFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + edgesInSSSPPath = edgesInSSSPFile.toURI().toString(); + } + + @Test --- End diff -- The test for removing the non-SP-edge is doable. It's the bigger graph test case that concerns me. Of course, we can generate some randomised edges, but how do we know which of those edges are in SSSP? That can only be done if we have the actual algorithm implemented. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1670] Made DataStream iterable
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/581#issuecomment-94911176 This looks much better. Being not densely integrated into the DataStream makes it easier to maintain. The `InetAddress.getLocalHost().getHostAddress()` problem still persist in my opinion. This command gives some host name, usually the one tied to the local device. Many machines (especially cloud servers) have various addresses and various hostnames, not all of them exposed. Take an EC2 instance, which has at least three network interfaces - loopback: hostname is localhost or whatever you configure as the hostname (it is herman in my case). Not necessarily registered at the DNS. - A cloud internal one 10.x.x.x, hostname is something like ec2-europ1-z3-81-internal.amazonaws.com - An external one, such as 203.0.113.25, hostname is something like ec2-203-0-113-25.compute-1.amazonaws.com If your command returns herman, this is probably not of much help, when what you want is actually ec2-europ1-z3-81-internal.amazonaws.com --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1670) Collect method for streaming
[ https://issues.apache.org/jira/browse/FLINK-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=1450#comment-1450 ] ASF GitHub Bot commented on FLINK-1670: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/581#issuecomment-94911176 This looks much better. Being not densely integrated into the DataStream makes it easier to maintain. The `InetAddress.getLocalHost().getHostAddress()` problem still persist in my opinion. This command gives some host name, usually the one tied to the local device. Many machines (especially cloud servers) have various addresses and various hostnames, not all of them exposed. Take an EC2 instance, which has at least three network interfaces - loopback: hostname is localhost or whatever you configure as the hostname (it is herman in my case). Not necessarily registered at the DNS. - A cloud internal one 10.x.x.x, hostname is something like ec2-europ1-z3-81-internal.amazonaws.com - An external one, such as 203.0.113.25, hostname is something like ec2-203-0-113-25.compute-1.amazonaws.com If your command returns herman, this is probably not of much help, when what you want is actually ec2-europ1-z3-81-internal.amazonaws.com Collect method for streaming Key: FLINK-1670 URL: https://issues.apache.org/jira/browse/FLINK-1670 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gabor Gevay Priority: Minor A convenience method for streaming back the results of a job to the client. As the client itself is a bottleneck anyway an easy solution would be to provide a socket sink with degree of parallelism 1, from which a client utility can read. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-1743) Add multinomial logistic regression to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Alexandrov reassigned FLINK-1743: --- Assignee: Alexander Alexandrov Add multinomial logistic regression to machine learning library --- Key: FLINK-1743 URL: https://issues.apache.org/jira/browse/FLINK-1743 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Alexander Alexandrov Labels: ML Multinomial logistic regression [1] would be good first classification algorithm which can classify multiple classes. Resources: [1] [http://en.wikipedia.org/wiki/Multinomial_logistic_regression] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1670) Collect method for streaming
[ https://issues.apache.org/jira/browse/FLINK-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505517#comment-14505517 ] ASF GitHub Bot commented on FLINK-1670: --- Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/581#issuecomment-94904085 I updated the pull request as per the above points. Collect method for streaming Key: FLINK-1670 URL: https://issues.apache.org/jira/browse/FLINK-1670 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gabor Gevay Priority: Minor A convenience method for streaming back the results of a job to the client. As the client itself is a bottleneck anyway an easy solution would be to provide a socket sink with degree of parallelism 1, from which a client utility can read. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1670] Made DataStream iterable
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/581#issuecomment-94904085 I updated the pull request as per the above points. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-1736) Add CountVectorizer to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Alexandrov updated FLINK-1736: Assignee: Alexander Alexandrov Add CountVectorizer to machine learning library --- Key: FLINK-1736 URL: https://issues.apache.org/jira/browse/FLINK-1736 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Alexander Alexandrov Labels: ML A {{CountVectorizer}} feature extractor [1] assigns each occurring word in a corpus an unique identifier. With this mapping it can vectorize models such as bag of words or ngrams in a efficient way. The unique identifier assigned to a word acts as the index of a vector. The number of word occurrences is represented as a vector value at a specific index. The advantage of the {{CountVectorizer}} compared to the FeatureHasher is that the mapping of words to indices can be obtained which makes it easier to understand the resulting feature vectors. The {{CountVectorizer}} could be generalized to support arbitrary feature values. The {{CountVectorizer}} should be implemented as a {{Transfomer}}. Resources: [1] [http://scikit-learn.org/stable/modules/feature_extraction.html#common-vectorizer-usage] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1523) Vertex-centric iteration extensions
[ https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505534#comment-14505534 ] ASF GitHub Bot commented on FLINK-1523: --- Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28811435 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.IncrementalSSSPExample; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class IncrementalSSSPITCase extends MultipleProgramsTestBase { + + private String verticesPath; + + private String edgesPath; + + private String edgesInSSSPPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public IncrementalSSSPITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + File verticesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8); + + File edgesInSSSPFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + edgesInSSSPPath = edgesInSSSPFile.toURI().toString(); + } + + @Test --- End diff -- The test for removing the non-SP-edge is doable. It's the bigger graph test case that concerns me. Of course, we can generate some randomised edges, but how do we know which of those edges are in SSSP? That can only be done if we have the actual algorithm implemented. Vertex-centric iteration extensions --- Key: FLINK-1523 URL: https://issues.apache.org/jira/browse/FLINK-1523 Project: Flink Issue Type: Improvement Components: Gelly Reporter: Vasia Kalavri Assignee: Andra Lungu We would like to make the following extensions to the vertex-centric iterations of Gelly: - allow vertices to access their in/out degrees and the total number of vertices of the graph, inside the iteration. - allow choosing the neighborhood type (in/out/all) over which to run the vertex-centric iteration. Now, the model uses the updates of the in-neighbors to calculate state and send messages to out-neighbors. We could add a parameter with value in/out/all to the {{VertexUpdateFunction}} and {{MessagingFunction}}, that would indicate the type of neighborhood. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1867) TaskManagerFailureRecoveryITCase causes stalled travis builds
[ https://issues.apache.org/jira/browse/FLINK-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505587#comment-14505587 ] ASF GitHub Bot commented on FLINK-1867: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28814178 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala --- @@ -231,9 +231,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1000 ms) -config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4000 ms) +config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s) --- End diff -- What is the effect of this setting on the test? Does it take very long now, because the JobManager needs really long to determine that the TaskManager has failed? TaskManagerFailureRecoveryITCase causes stalled travis builds - Key: FLINK-1867 URL: https://issues.apache.org/jira/browse/FLINK-1867 Project: Flink Issue Type: Bug Components: TaskManager, Tests Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Aljoscha Krettek There are currently tests on travis failing: https://travis-ci.org/apache/flink/jobs/57943063 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1908) JobManager startup delay isn't considered when using start-cluster.sh script
[ https://issues.apache.org/jira/browse/FLINK-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505313#comment-14505313 ] Stephan Ewen commented on FLINK-1908: - I don't think that this issue will be fixed in 0.8.x. @DarkKnightCZ Can you verify whether 0.9 works for you? JobManager startup delay isn't considered when using start-cluster.sh script Key: FLINK-1908 URL: https://issues.apache.org/jira/browse/FLINK-1908 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.9, 0.8.1 Environment: Linux Reporter: Lukas Raska Priority: Minor Original Estimate: 5m Remaining Estimate: 5m When starting Flink cluster via start-cluster.sh script, JobManager startup can be delayed (as it's started asynchronously), which can result in failed startup of several task managers. Solution is to wait certain amount of time and periodically check if RPC port is accessible, then proceed with starting task managers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Alexandrov reassigned FLINK-1731: --- Assignee: Alexander Alexandrov Add kMeans clustering algorithm to machine learning library --- Key: FLINK-1731 URL: https://issues.apache.org/jira/browse/FLINK-1731 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Alexander Alexandrov Labels: ML The Flink repository already contains a kMeans implementation but it is not yet ported to the machine learning library. I assume that only the used data types have to be adapted and then it can be more or less directly moved to flink-ml. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1918) NullPointerException at org.apache.flink.client.program.Client's constructor while using ExecutionEnvironment.createRemoteEnvironment
[ https://issues.apache.org/jira/browse/FLINK-1918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505348#comment-14505348 ] Stephan Ewen commented on FLINK-1918: - Thank you for reporting this. From what I can see, this may happen in the case where a host lookup failed. This should definitely give a better error message, or fail earlier with Unknown host. I'll prepare a patch for this... NullPointerException at org.apache.flink.client.program.Client's constructor while using ExecutionEnvironment.createRemoteEnvironment - Key: FLINK-1918 URL: https://issues.apache.org/jira/browse/FLINK-1918 Project: Flink Issue Type: Bug Components: YARN Client Reporter: Zoltán Zvara Labels: yarn, yarn-client Trace: {code} Exception in thread main java.lang.NullPointerException at org.apache.flink.client.program.Client.init(Client.java:104) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:86) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70) at Wordcount.main(Wordcount.java:23) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) {code} The constructor is trying to set configuration parameter {{jobmanager.rpc.address}} with {{jobManagerAddress.getAddress().getHostAddress()}}, but {{jobManagerAddress.holder.addr}} is {{null}}. {{jobManagerAddress.holder.hostname}} and {{port}} holds the valid information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1398) A new DataSet function: extractElementFromTuple
[ https://issues.apache.org/jira/browse/FLINK-1398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505613#comment-14505613 ] ASF GitHub Bot commented on FLINK-1398: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/308#issuecomment-94918887 I think this is good in general, modulo two issues: - There are going to be more Utils, to I would like to give it a more speaking name, like TupleUtils, or TupleExtractors, or something along these lines. - I think we can omit the return type class. Similar as with the projections, this should not be needed. A new DataSet function: extractElementFromTuple --- Key: FLINK-1398 URL: https://issues.apache.org/jira/browse/FLINK-1398 Project: Flink Issue Type: Wish Reporter: Felix Neutatz Assignee: Felix Neutatz Priority: Minor This is the use case: {code:xml} DataSetTuple2Integer, Double data = env.fromElements(new Tuple2Integer, Double(1,2.0)); data.map(new ElementFromTuple()); } public static final class ElementFromTuple implements MapFunctionTuple2Integer, Double, Double { @Override public Double map(Tuple2Integer, Double value) { return value.f1; } } {code} It would be awesome if we had something like this: {code:xml} data.extractElement(1); {code} This means that we implement a function for DataSet which extracts a certain element from a given Tuple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Fixed Configurable HadoopOutputFormat (FLINK-1...
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r28765996 --- Diff: flink-staging/flink-hbase/pom.xml --- @@ -112,6 +112,12 @@ under the License. /exclusion /exclusions /dependency + dependency --- End diff -- It is needed if you want to use the HBase TableOutputFormat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504664#comment-14504664 ] ASF GitHub Bot commented on FLINK-1807: --- GitHub user thvasilo opened a pull request: https://github.com/apache/flink/pull/613 [WIP] - [FLINK-1807/1889] - Optimization frame work and initial SGD implementation This is a WIP PR for the optimization framework of the Flink ML library. The design is a mix between how sklearn and Apache Spark implement their learning algorithm optimization frameworks. The idea is that a Learner can take a Solver, LossFunction and RegularizationType as parameters, similar to the design that sklearn uses and Spark seems to be headed to. This allows for flexibility on how users design their learning algorithms. A Solver uses the LossFunction and RegularizationType in order to optimize the weights according to the provided DataSet of LabeledVector (label, featuresVector). As you will see in the TODOs there are many questions regarding the design yet, and no real RegularizationType has been implemented yet so that interface could change depending on what we end up needing for the regularization calculation. A first implementation of Stochastic Gradient Descent is included. As you will see, the stochastic part is still missing as we are blocked on a sample operator for DataSet. Instead we have to map over the whole data. If you run the tests you will see that the third test where we try to perform just one step of the optimization does not work. I haven't been able to figure out why this happens yet, any help would be appreciated. I've also included a wrapper for BLAS functions that was copied directly from MLlib. You can merge this pull request into a Git repository by running: $ git pull https://github.com/thvasilo/flink optimization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/613.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #613 commit 1ed6032b6505488549785ff38b5805586a0465cb Author: Theodore Vasiloudis t...@sics.se Date: 2015-04-21T08:59:34Z Interfaces for the optimization framework. BLAS.scala was directly copied from the Apache Spark project. commit 5a40f14790fd024fdd9a01069262627cda2126a4 Author: Theodore Vasiloudis t...@sics.se Date: 2015-04-21T09:01:50Z Added Stochastic Gradient Descent initial version and some tests. Stochastic gradient descent optimizer for ML library Key: FLINK-1807 URL: https://issues.apache.org/jira/browse/FLINK-1807 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Labels: ML Stochastic gradient descent (SGD) is a widely used optimization technique in different ML algorithms. Thus, it would be helpful to provide a generalized SGD implementation which can be instantiated with the respective gradient computation. Such a building block would make the development of future algorithms easier. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results
[ https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504939#comment-14504939 ] ASF GitHub Bot commented on FLINK-1297: --- Github user tammymendt commented on a diff in the pull request: https://github.com/apache/flink/pull/605#discussion_r28776656 --- Diff: flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statistics; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.clearspring.analytics.stream.cardinality.ICardinality; +import com.clearspring.analytics.stream.cardinality.LinearCounting; +import org.apache.flink.statistics.heavyhitters.IHeavyHitter; +import org.apache.flink.statistics.heavyhitters.LossyCounting; +import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter; +import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException; + +import java.io.Serializable; +import java.util.Map; + +/** + * Data structure that encapsulates statistical information of data that has only been processed by one pass + * This statistical information is meant to help determine the distribution of the data that has been processed + * in an operator so that we can determine if it is necessary to repartition the data + * + * The statistics to be gathered are configurable and represented by a {@link OperatorStatisticsConfig} object. + * + * The information encapsulated in this class is min, max, a structure enabling estimation of count distinct and a + * structure holding the heavy hitters along with their frequency. + * + */ +public class OperatorStatistics implements Serializable { + + OperatorStatisticsConfig config; + + Object min; + Object max; + ICardinality countDistinct; + IHeavyHitter heavyHitter; + long cardinality = 0; + + public OperatorStatistics(OperatorStatisticsConfig config) { + this.config = config; + if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING)) { + countDistinct = new LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE); + } + if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){ + countDistinct = new HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M); + } + if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){ + heavyHitter = + new LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, OperatorStatisticsConfig.HEAVY_HITTER_ERROR); + } + if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){ + heavyHitter = + new CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, + OperatorStatisticsConfig.HEAVY_HITTER_ERROR, + OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE, + OperatorStatisticsConfig.HEAVY_HITTER_SEED); + } + } + + public void process(Object tupleObject){ --- End diff -- The problem with processing only every n-th element is that precision would be affected, and the algorithms are already estimating as it is. I am planning to measure performance overhead but haven't done it so far. I have
[jira] [Commented] (FLINK-1636) Misleading exception during concurrent partition release and remote request
[ https://issues.apache.org/jira/browse/FLINK-1636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504908#comment-14504908 ] Maximilian Michels commented on FLINK-1636: --- The error above is based on an older code base (e.g. 9d7acf3657cbd3fb0b238b20ba864b6a74774e40). Some work has been done on the RemoteInputChannel. The problem should still persists though. On the master (e2a00183eb539889d7a5053c49b2a296de79add0) this currently looks similar: {code} if (expectedSequenceNumber == sequenceNumber) { receivedBuffers.add(buffer); expectedSequenceNumber++; notifyAvailableBuffer(); success = true; } else { onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); } {code} This error only gets thrown when the sequence number of the buffer doesn't match. However, your description states a different problem, i.e. the buffer has already been released on the remote side. That should be detected by other means than the sequence number (e.g. by an identifier). So do we need an additional check in the code or is the error cause you described not applicable here? Misleading exception during concurrent partition release and remote request --- Key: FLINK-1636 URL: https://issues.apache.org/jira/browse/FLINK-1636 Project: Flink Issue Type: Improvement Components: Distributed Runtime Reporter: Ufuk Celebi Priority: Minor When a result partition is released concurrently with a remote partition request, the request might come in late and result in an exception at the receiving task saying: {code} 16:04:22,499 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN Partition - Map (Map at testRestartMultipleTimes(SimpleRecoveryITCase.java:200)) (1/4) switched to FAILED : java.io.IOException: org.apache.flink.runtime.io.network.partition.queue.IllegalQueueIteratorRequestException at remote input channel: Intermediate result partition has already been released.]. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkIoError(RemoteInputChannel.java:223) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:103) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:310) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:75) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:205) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1398) A new DataSet function: extractElementFromTuple
[ https://issues.apache.org/jira/browse/FLINK-1398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504932#comment-14504932 ] ASF GitHub Bot commented on FLINK-1398: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/308#discussion_r28776433 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/lib/DataSetUtil.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.lib; + + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + + +public class DataSetUtil { + + + // + // Extraction of a single field + // + + /** +* Applies a single field extraction on a {@link Tuple} {@link DataSet}.br/ +* bNote: Can be only applied on Tuple DataSets using the corresponding field index./b/br +* The transformation extracts of each Tuple of the DataSet a given field./br +* +* +* @param ds The input DataSet. +* @param fieldIndex The field index of the input tuple which is extracted. +* @param outputType Class of the extracted field. +* @return A SingleInputUdfOperator that represents the extracted field. +* +* @see Tuple +* @see DataSet +* @see org.apache.flink.api.java.operators.SingleInputUdfOperator +*/ + public static IN extends Tuple, OUT SingleInputUdfOperatorIN, OUT, MapOperatorIN, OUT extractSingleField(DataSetIN ds, int fieldIndex, ClassOUT outputType) { + + if(!ds.getType().isTupleType()) { + throw new IllegalArgumentException(The DataSet has to contain a Tuple, not + ds.getType().getTypeClass().getName()); + } + + TupleTypeInfoIN tupleInfo = (TupleTypeInfo) ds.getType(); + if(fieldIndex = tupleInfo.getArity() || fieldIndex 0) { + throw new IndexOutOfBoundsException(The field index has to be between 0 and + (tupleInfo.getArity()-1)); + } + + if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { + throw new IllegalArgumentException(The output class type has to be: + tupleInfo.getTypeAt(fieldIndex).toString()); + } + + return ds.map(new ExtractElement(fieldIndex)).returns(tupleInfo.getTypeAt(fieldIndex)); --- End diff -- Add `.name(Extract Field +fieldIndex)` to specify the name of the Map operator. A new DataSet function: extractElementFromTuple --- Key: FLINK-1398 URL: https://issues.apache.org/jira/browse/FLINK-1398 Project: Flink Issue Type: Wish Reporter: Felix Neutatz Assignee: Felix Neutatz Priority: Minor This is the use case: {code:xml} DataSetTuple2Integer, Double data = env.fromElements(new Tuple2Integer, Double(1,2.0)); data.map(new ElementFromTuple()); } public static final class ElementFromTuple implements MapFunctionTuple2Integer, Double, Double { @Override public Double map(Tuple2Integer, Double value) { return value.f1; } } {code} It would be awesome if we had something like this: {code:xml}
[GitHub] flink pull request: [FLINK-1398] Introduce extractSingleField() in...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/308#discussion_r28776433 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/lib/DataSetUtil.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.lib; + + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + + +public class DataSetUtil { + + + // + // Extraction of a single field + // + + /** +* Applies a single field extraction on a {@link Tuple} {@link DataSet}.br/ +* bNote: Can be only applied on Tuple DataSets using the corresponding field index./b/br +* The transformation extracts of each Tuple of the DataSet a given field./br +* +* +* @param ds The input DataSet. +* @param fieldIndex The field index of the input tuple which is extracted. +* @param outputType Class of the extracted field. +* @return A SingleInputUdfOperator that represents the extracted field. +* +* @see Tuple +* @see DataSet +* @see org.apache.flink.api.java.operators.SingleInputUdfOperator +*/ + public static IN extends Tuple, OUT SingleInputUdfOperatorIN, OUT, MapOperatorIN, OUT extractSingleField(DataSetIN ds, int fieldIndex, ClassOUT outputType) { + + if(!ds.getType().isTupleType()) { + throw new IllegalArgumentException(The DataSet has to contain a Tuple, not + ds.getType().getTypeClass().getName()); + } + + TupleTypeInfoIN tupleInfo = (TupleTypeInfo) ds.getType(); + if(fieldIndex = tupleInfo.getArity() || fieldIndex 0) { + throw new IndexOutOfBoundsException(The field index has to be between 0 and + (tupleInfo.getArity()-1)); + } + + if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { + throw new IllegalArgumentException(The output class type has to be: + tupleInfo.getTypeAt(fieldIndex).toString()); + } + + return ds.map(new ExtractElement(fieldIndex)).returns(tupleInfo.getTypeAt(fieldIndex)); --- End diff -- Add `.name(Extract Field +fieldIndex)` to specify the name of the Map operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1398) A new DataSet function: extractElementFromTuple
[ https://issues.apache.org/jira/browse/FLINK-1398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504934#comment-14504934 ] ASF GitHub Bot commented on FLINK-1398: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/308#issuecomment-94792842 How do we proceed with this PR? I think it looks good and would be OK with adding this feature to a `DataSetUtils` class. Other opinions? A new DataSet function: extractElementFromTuple --- Key: FLINK-1398 URL: https://issues.apache.org/jira/browse/FLINK-1398 Project: Flink Issue Type: Wish Reporter: Felix Neutatz Assignee: Felix Neutatz Priority: Minor This is the use case: {code:xml} DataSetTuple2Integer, Double data = env.fromElements(new Tuple2Integer, Double(1,2.0)); data.map(new ElementFromTuple()); } public static final class ElementFromTuple implements MapFunctionTuple2Integer, Double, Double { @Override public Double map(Tuple2Integer, Double value) { return value.f1; } } {code} It would be awesome if we had something like this: {code:xml} data.extractElement(1); {code} This means that we implement a function for DataSet which extracts a certain element from a given Tuple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...
Github user tammymendt commented on a diff in the pull request: https://github.com/apache/flink/pull/605#discussion_r28776656 --- Diff: flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statistics; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.clearspring.analytics.stream.cardinality.ICardinality; +import com.clearspring.analytics.stream.cardinality.LinearCounting; +import org.apache.flink.statistics.heavyhitters.IHeavyHitter; +import org.apache.flink.statistics.heavyhitters.LossyCounting; +import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter; +import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException; + +import java.io.Serializable; +import java.util.Map; + +/** + * Data structure that encapsulates statistical information of data that has only been processed by one pass + * This statistical information is meant to help determine the distribution of the data that has been processed + * in an operator so that we can determine if it is necessary to repartition the data + * + * The statistics to be gathered are configurable and represented by a {@link OperatorStatisticsConfig} object. + * + * The information encapsulated in this class is min, max, a structure enabling estimation of count distinct and a + * structure holding the heavy hitters along with their frequency. + * + */ +public class OperatorStatistics implements Serializable { + + OperatorStatisticsConfig config; + + Object min; + Object max; + ICardinality countDistinct; + IHeavyHitter heavyHitter; + long cardinality = 0; + + public OperatorStatistics(OperatorStatisticsConfig config) { + this.config = config; + if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING)) { + countDistinct = new LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE); + } + if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){ + countDistinct = new HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M); + } + if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){ + heavyHitter = + new LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, OperatorStatisticsConfig.HEAVY_HITTER_ERROR); + } + if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){ + heavyHitter = + new CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, + OperatorStatisticsConfig.HEAVY_HITTER_ERROR, + OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE, + OperatorStatisticsConfig.HEAVY_HITTER_SEED); + } + } + + public void process(Object tupleObject){ --- End diff -- The problem with processing only every n-th element is that precision would be affected, and the algorithms are already estimating as it is. I am planning to measure performance overhead but haven't done it so far. I have removed unnecessary checks inside the process function. The point of them was to allow the user to configure which stats wished to be tracked, by means of the OperatorStatisticsConfig class. This way for example an OperatorStatisticsAccumulator could
[GitHub] flink pull request: [WIP] - [FLINK-1807/1889] - Optimization frame...
GitHub user thvasilo opened a pull request: https://github.com/apache/flink/pull/613 [WIP] - [FLINK-1807/1889] - Optimization frame work and initial SGD implementation This is a WIP PR for the optimization framework of the Flink ML library. The design is a mix between how sklearn and Apache Spark implement their learning algorithm optimization frameworks. The idea is that a Learner can take a Solver, LossFunction and RegularizationType as parameters, similar to the design that sklearn uses and Spark seems to be headed to. This allows for flexibility on how users design their learning algorithms. A Solver uses the LossFunction and RegularizationType in order to optimize the weights according to the provided DataSet of LabeledVector (label, featuresVector). As you will see in the TODOs there are many questions regarding the design yet, and no real RegularizationType has been implemented yet so that interface could change depending on what we end up needing for the regularization calculation. A first implementation of Stochastic Gradient Descent is included. As you will see, the stochastic part is still missing as we are blocked on a sample operator for DataSet. Instead we have to map over the whole data. If you run the tests you will see that the third test where we try to perform just one step of the optimization does not work. I haven't been able to figure out why this happens yet, any help would be appreciated. I've also included a wrapper for BLAS functions that was copied directly from MLlib. You can merge this pull request into a Git repository by running: $ git pull https://github.com/thvasilo/flink optimization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/613.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #613 commit 1ed6032b6505488549785ff38b5805586a0465cb Author: Theodore Vasiloudis t...@sics.se Date: 2015-04-21T08:59:34Z Interfaces for the optimization framework. BLAS.scala was directly copied from the Apache Spark project. commit 5a40f14790fd024fdd9a01069262627cda2126a4 Author: Theodore Vasiloudis t...@sics.se Date: 2015-04-21T09:01:50Z Added Stochastic Gradient Descent initial version and some tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504696#comment-14504696 ] ASF GitHub Bot commented on FLINK-1807: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28763749 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/WeightVector.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common + +import org.apache.flink.ml.math.Vector + +// TODO(tvas): This privdes an abstraction for the weights +// but at the same time it leads to the creation of many objects as we have to pack and unpack +// the weights and the intercept often during SGD. + +/** This class represents a weight vector with an intercept, as it is required for many supervised + * learning tasks + * @param weights The vector of weights + * @param intercept The intercept (bias) weight + */ +case class WeightVector(weights: Vector, intercept: Double) extends Serializable { + + override def equals(obj: Any): Boolean = { --- End diff -- This method should be automatically generated by Scala since we're using case classes. Stochastic gradient descent optimizer for ML library Key: FLINK-1807 URL: https://issues.apache.org/jira/browse/FLINK-1807 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Labels: ML Stochastic gradient descent (SGD) is a widely used optimization technique in different ML algorithms. Thus, it would be helpful to provide a generalized SGD implementation which can be instantiated with the respective gradient computation. Such a building block would make the development of future algorithms easier. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [WIP] - [FLINK-1807/1889] - Optimization frame...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28763749 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/WeightVector.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common + +import org.apache.flink.ml.math.Vector + +// TODO(tvas): This privdes an abstraction for the weights +// but at the same time it leads to the creation of many objects as we have to pack and unpack +// the weights and the intercept often during SGD. + +/** This class represents a weight vector with an intercept, as it is required for many supervised + * learning tasks + * @param weights The vector of weights + * @param intercept The intercept (bias) weight + */ +case class WeightVector(weights: Vector, intercept: Double) extends Serializable { + + override def equals(obj: Any): Boolean = { --- End diff -- This method should be automatically generated by Scala since we're using case classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [WIP] - [FLINK-1807/1889] - Optimization frame...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28765413 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/WeightVector.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common + +import org.apache.flink.ml.math.Vector + +// TODO(tvas): This privdes an abstraction for the weights +// but at the same time it leads to the creation of many objects as we have to pack and unpack +// the weights and the intercept often during SGD. + +/** This class represents a weight vector with an intercept, as it is required for many supervised + * learning tasks + * @param weights The vector of weights + * @param intercept The intercept (bias) weight + */ +case class WeightVector(weights: Vector, intercept: Double) extends Serializable { + + override def equals(obj: Any): Boolean = { +obj match { + case weightVector: WeightVector = +weights.equals(weightVector.weights) intercept.equals(weightVector.intercept) + case _ = false +} + } + + override def toString: String = { --- End diff -- I copied the LabeledVector code here, since these are essentially the same class. Is only the toString method generated? Should it be removed from LabeledVector as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504716#comment-14504716 ] ASF GitHub Bot commented on FLINK-1807: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28765413 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/WeightVector.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common + +import org.apache.flink.ml.math.Vector + +// TODO(tvas): This privdes an abstraction for the weights +// but at the same time it leads to the creation of many objects as we have to pack and unpack +// the weights and the intercept often during SGD. + +/** This class represents a weight vector with an intercept, as it is required for many supervised + * learning tasks + * @param weights The vector of weights + * @param intercept The intercept (bias) weight + */ +case class WeightVector(weights: Vector, intercept: Double) extends Serializable { + + override def equals(obj: Any): Boolean = { +obj match { + case weightVector: WeightVector = +weights.equals(weightVector.weights) intercept.equals(weightVector.intercept) + case _ = false +} + } + + override def toString: String = { --- End diff -- I copied the LabeledVector code here, since these are essentially the same class. Is only the toString method generated? Should it be removed from LabeledVector as well? Stochastic gradient descent optimizer for ML library Key: FLINK-1807 URL: https://issues.apache.org/jira/browse/FLINK-1807 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Labels: ML Stochastic gradient descent (SGD) is a widely used optimization technique in different ML algorithms. Thus, it would be helpful to provide a generalized SGD implementation which can be instantiated with the respective gradient computation. Such a building block would make the development of future algorithms easier. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1906] [docs] Add tip to work around pla...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/611 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504707#comment-14504707 ] ASF GitHub Bot commented on FLINK-1807: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28764408 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala --- @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +import com.github.fommil.netlib.{BLAS = NetlibBLAS, F2jBLAS} +import com.github.fommil.netlib.BLAS.{getInstance = NativeBLAS} + + +/** + * BLAS routines for MLlib's vectors and matrices. + */ +object BLAS extends Serializable { + + @transient private var _f2jBLAS: NetlibBLAS = _ + @transient private var _nativeBLAS: NetlibBLAS = _ + + // For level-1 routines, we use Java implementation. + private def f2jBLAS: NetlibBLAS = { +if (_f2jBLAS == null) { + _f2jBLAS = new F2jBLAS +} +_f2jBLAS + } + + /** + * y += a * x + */ + def axpy(a: Double, x: Vector, y: Vector): Unit = { +require(x.size == y.size) +y match { + case dy: DenseVector = +x match { + case sx: SparseVector = +axpy(a, sx, dy) + case dx: DenseVector = +axpy(a, dx, dy) + case _ = +throw new UnsupportedOperationException( + saxpy doesn't support x type ${x.getClass}.) +} + case _ = +throw new IllegalArgumentException( + saxpy only supports adding to a dense vector but got type ${y.getClass}.) +} + } + + /** + * y += a * x + */ + private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = { +val n = x.size +f2jBLAS.daxpy(n, a, x.data, 1, y.data, 1) + } + + /** + * y += a * x + */ + private def axpy(a: Double, x: SparseVector, y: DenseVector): Unit = { +val xValues = x.data +val xIndices = x.indices +val yValues = y.data +val nnz = xIndices.size + +if (a == 1.0) { + var k = 0 + while (k nnz) { +yValues(xIndices(k)) += xValues(k) +k += 1 + } +} else { + var k = 0 + while (k nnz) { +yValues(xIndices(k)) += a * xValues(k) +k += 1 + } +} + } + + /** + * dot(x, y) + */ + def dot(x: Vector, y: Vector): Double = { +require(x.size == y.size, + BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: + + x.size = + x.size + , y.size = + y.size) +(x, y) match { + case (dx: DenseVector, dy: DenseVector) = +dot(dx, dy) + case (sx: SparseVector, dy: DenseVector) = +dot(sx, dy) + case (dx: DenseVector, sy: SparseVector) = +dot(sy, dx) + case (sx: SparseVector, sy: SparseVector) = +dot(sx, sy) + case _ = +throw new IllegalArgumentException(sdot doesn't support (${x.getClass}, ${y.getClass}).) +} + } + + /** + * dot(x, y) + */ + private def dot(x: DenseVector, y: DenseVector): Double = { +val n = x.size +f2jBLAS.ddot(n, x.data, 1, y.data, 1) + } + + /** + * dot(x, y) + */ + private def dot(x: SparseVector, y: DenseVector): Double = { +val xValues = x.data +val xIndices = x.indices +val yValues = y.data +val nnz = xIndices.size + +var sum = 0.0 +var k = 0 +while (k nnz) { + sum += xValues(k) * yValues(xIndices(k)) + k += 1 +} +sum + } + + /** + * dot(x, y) + */ + private def dot(x: SparseVector, y:
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28764335 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java --- @@ -40,7 +42,37 @@ VertexValue extends Serializable, Message implements Serializable { private static final long serialVersionUID = 1L; - + + // + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // + + private long numberOfVertices = -1L; + + public long getNumberOfVertices() throws Exception{ + if (numberOfVertices == -1) { + throw new InaccessibleMethodException(The number of vertices option is not set); + } + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + //- + + private boolean optDegrees; + + public boolean isOptDegrees() { --- End diff -- Are you talking about the getters? To my knowledge, these are public. Otherwise, they would be totally unusable. If you're talking about the default setters, I needed to see them in the entire package. But they are not public. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [WIP] - [FLINK-1807/1889] - Optimization frame...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28764408 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala --- @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +import com.github.fommil.netlib.{BLAS = NetlibBLAS, F2jBLAS} +import com.github.fommil.netlib.BLAS.{getInstance = NativeBLAS} + + +/** + * BLAS routines for MLlib's vectors and matrices. + */ +object BLAS extends Serializable { + + @transient private var _f2jBLAS: NetlibBLAS = _ + @transient private var _nativeBLAS: NetlibBLAS = _ + + // For level-1 routines, we use Java implementation. + private def f2jBLAS: NetlibBLAS = { +if (_f2jBLAS == null) { + _f2jBLAS = new F2jBLAS +} +_f2jBLAS + } + + /** + * y += a * x + */ + def axpy(a: Double, x: Vector, y: Vector): Unit = { +require(x.size == y.size) +y match { + case dy: DenseVector = +x match { + case sx: SparseVector = +axpy(a, sx, dy) + case dx: DenseVector = +axpy(a, dx, dy) + case _ = +throw new UnsupportedOperationException( + saxpy doesn't support x type ${x.getClass}.) +} + case _ = +throw new IllegalArgumentException( + saxpy only supports adding to a dense vector but got type ${y.getClass}.) +} + } + + /** + * y += a * x + */ + private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = { +val n = x.size +f2jBLAS.daxpy(n, a, x.data, 1, y.data, 1) + } + + /** + * y += a * x + */ + private def axpy(a: Double, x: SparseVector, y: DenseVector): Unit = { +val xValues = x.data +val xIndices = x.indices +val yValues = y.data +val nnz = xIndices.size + +if (a == 1.0) { + var k = 0 + while (k nnz) { +yValues(xIndices(k)) += xValues(k) +k += 1 + } +} else { + var k = 0 + while (k nnz) { +yValues(xIndices(k)) += a * xValues(k) +k += 1 + } +} + } + + /** + * dot(x, y) + */ + def dot(x: Vector, y: Vector): Double = { +require(x.size == y.size, + BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: + + x.size = + x.size + , y.size = + y.size) +(x, y) match { + case (dx: DenseVector, dy: DenseVector) = +dot(dx, dy) + case (sx: SparseVector, dy: DenseVector) = +dot(sx, dy) + case (dx: DenseVector, sy: SparseVector) = +dot(sy, dx) + case (sx: SparseVector, sy: SparseVector) = +dot(sx, sy) + case _ = +throw new IllegalArgumentException(sdot doesn't support (${x.getClass}, ${y.getClass}).) +} + } + + /** + * dot(x, y) + */ + private def dot(x: DenseVector, y: DenseVector): Double = { +val n = x.size +f2jBLAS.ddot(n, x.data, 1, y.data, 1) + } + + /** + * dot(x, y) + */ + private def dot(x: SparseVector, y: DenseVector): Double = { +val xValues = x.data +val xIndices = x.indices +val yValues = y.data +val nnz = xIndices.size + +var sum = 0.0 +var k = 0 +while (k nnz) { + sum += xValues(k) * yValues(xIndices(k)) + k += 1 +} +sum + } + + /** + * dot(x, y) + */ + private def dot(x: SparseVector, y: SparseVector): Double = { +val xValues = x.data +val xIndices = x.indices +val yValues = y.data +val yIndices = y.indices +val nnzx = xIndices.size +val nnzy = yIndices.size + +var kx = 0 +var
[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/372#issuecomment-94703910 I've added documentation for the new print method. Will merge later on. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [WIP] - [FLINK-1807/1889] - Optimization frame...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28763991 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala --- @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +import com.github.fommil.netlib.{BLAS = NetlibBLAS, F2jBLAS} +import com.github.fommil.netlib.BLAS.{getInstance = NativeBLAS} + + +/** + * BLAS routines for MLlib's vectors and matrices. + */ --- End diff -- Add more explicit note indicating the origin of the code. Maybe a link to the corresponding file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/579#issuecomment-94698596 Great work Faye. Will merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1906) Add tip to work around plain Tuple return type of project operator
[ https://issues.apache.org/jira/browse/FLINK-1906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504563#comment-14504563 ] ASF GitHub Bot commented on FLINK-1906: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/611#issuecomment-94697505 Thanks for the PR! Add tip to work around plain Tuple return type of project operator -- Key: FLINK-1906 URL: https://issues.apache.org/jira/browse/FLINK-1906 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 0.9 Reporter: Fabian Hueske Assignee: Chiwan Park Priority: Minor Labels: starter The Java compiler is not able to infer the return type of the {{project}} operator and defaults to {{Tuple}}. This can cause problems if another operator is immediately called on the result of a {{project}} operator such as: {code} DataSetTuple5String,String,String,String,String ds = DataSetTuple1String ds2 = ds.project(0).distinct(0); {code} This problem can be overcome by hinting the return type of {{project}} like this: {code} DataSetTuple1String ds2 = ds.Tuple1Stringproject(0).distinct(0); {code} We should add this description to the documentation of the project operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1906] [docs] Add tip to work around pla...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/611#issuecomment-94697505 Thanks for the PR! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1906) Add tip to work around plain Tuple return type of project operator
[ https://issues.apache.org/jira/browse/FLINK-1906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504562#comment-14504562 ] ASF GitHub Bot commented on FLINK-1906: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/611 Add tip to work around plain Tuple return type of project operator -- Key: FLINK-1906 URL: https://issues.apache.org/jira/browse/FLINK-1906 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 0.9 Reporter: Fabian Hueske Assignee: Chiwan Park Priority: Minor Labels: starter The Java compiler is not able to infer the return type of the {{project}} operator and defaults to {{Tuple}}. This can cause problems if another operator is immediately called on the result of a {{project}} operator such as: {code} DataSetTuple5String,String,String,String,String ds = DataSetTuple1String ds2 = ds.project(0).distinct(0); {code} This problem can be overcome by hinting the return type of {{project}} like this: {code} DataSetTuple1String ds2 = ds.Tuple1Stringproject(0).distinct(0); {code} We should add this description to the documentation of the project operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1486) Add a string to the print method to identify output
[ https://issues.apache.org/jira/browse/FLINK-1486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504611#comment-14504611 ] ASF GitHub Bot commented on FLINK-1486: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/372#issuecomment-94703910 I've added documentation for the new print method. Will merge later on. Add a string to the print method to identify output --- Key: FLINK-1486 URL: https://issues.apache.org/jira/browse/FLINK-1486 Project: Flink Issue Type: Improvement Components: Local Runtime Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Minor Labels: usability Fix For: 0.9 The output of the {{print}} method of {[DataSet}} is mainly used for debug purposes. Currently, it is difficult to identify the output. I would suggest to add another {{print(String str)}} method which allows the user to supply a String to identify the output. This could be a prefix before the actual output or a format string (which might be an overkill). {code} DataSet data = env.fromElements(1,2,3,4,5); {code} For example, {{data.print(MyDataSet: )}} would output print {noformat} MyDataSet: 1 MyDataSet: 2 ... {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1820) Bug in DoubleParser and FloatParser - empty String is not casted to 0
[ https://issues.apache.org/jira/browse/FLINK-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504687#comment-14504687 ] ASF GitHub Bot commented on FLINK-1820: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/566#issuecomment-94722161 Your PR changes the semantics of the Integer parsers a bit because you ignore whitespaces. This change has a few implications. The following fields are parsed as correct Integer values: - ` 123 ` - `- 123` - `1 2 3` but the following is not accepted: - ` -123` This behavior is not expected, IMO. I know that `Double.parseDouble()` and `Float.parseFloat()` both ignore leading and tailing white spaces and the intention of this PR is to make the parsing of floating point and integer numeric values consistent. Instead of accepting leading and tailing white space in the Integer parsers, I propose to check for leading and tailing whitespaces in floating point fields and make these parsers fail in such cases. This would also give consistent parsing behavior. What do you think? Bug in DoubleParser and FloatParser - empty String is not casted to 0 - Key: FLINK-1820 URL: https://issues.apache.org/jira/browse/FLINK-1820 Project: Flink Issue Type: Bug Components: Core Affects Versions: 0.8.0, 0.9, 0.8.1 Reporter: Felix Neutatz Assignee: Felix Neutatz Priority: Critical Fix For: 0.9 Hi, I found the bug, when I wanted to read a csv file, which had a line like: ||\n If I treat it as a Tuple2Long,Long, I get as expected a tuple (0L,0L). But if I want to read it into a Double-Tuple or a Float-Tuple, I get the following error: java.lang.AssertionError: Test failed due to a org.apache.flink.api.common.io.ParseException: Line could not be parsed: '||' ParserError NUMERIC_VALUE_FORMAT_ERROR This error can be solved by adding an additional condition for empty strings in the FloatParser / DoubleParser. We definitely need the CSVReader to be able to read empty values. I can fix it like described if there are no better ideas :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504698#comment-14504698 ] ASF GitHub Bot commented on FLINK-1807: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28763909 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/WeightVector.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common + +import org.apache.flink.ml.math.Vector + +// TODO(tvas): This privdes an abstraction for the weights +// but at the same time it leads to the creation of many objects as we have to pack and unpack +// the weights and the intercept often during SGD. + +/** This class represents a weight vector with an intercept, as it is required for many supervised + * learning tasks + * @param weights The vector of weights + * @param intercept The intercept (bias) weight + */ +case class WeightVector(weights: Vector, intercept: Double) extends Serializable { + + override def equals(obj: Any): Boolean = { +obj match { + case weightVector: WeightVector = +weights.equals(weightVector.weights) intercept.equals(weightVector.intercept) + case _ = false +} + } + + override def toString: String = { --- End diff -- The same method is generated automatically for Scala's case classes. Stochastic gradient descent optimizer for ML library Key: FLINK-1807 URL: https://issues.apache.org/jira/browse/FLINK-1807 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Labels: ML Stochastic gradient descent (SGD) is a widely used optimization technique in different ML algorithms. Thus, it would be helpful to provide a generalized SGD implementation which can be instantiated with the respective gradient computation. Such a building block would make the development of future algorithms easier. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [WIP] - [FLINK-1807/1889] - Optimization frame...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r28763909 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/WeightVector.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common + +import org.apache.flink.ml.math.Vector + +// TODO(tvas): This privdes an abstraction for the weights +// but at the same time it leads to the creation of many objects as we have to pack and unpack +// the weights and the intercept often during SGD. + +/** This class represents a weight vector with an intercept, as it is required for many supervised + * learning tasks + * @param weights The vector of weights + * @param intercept The intercept (bias) weight + */ +case class WeightVector(weights: Vector, intercept: Double) extends Serializable { + + override def equals(obj: Any): Boolean = { +obj match { + case weightVector: WeightVector = +weights.equals(weightVector.weights) intercept.equals(weightVector.intercept) + case _ = false +} + } + + override def toString: String = { --- End diff -- The same method is generated automatically for Scala's case classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Fixed Configurable HadoopOutputFormat (FLINK-1...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r28765691 --- Diff: flink-staging/flink-hbase/src/test/resources/hbase-site.xml --- @@ -22,14 +22,13 @@ -- configuration +!-- --- End diff -- Are these mandatory parameters to use HBase? Otherwise, we should remove them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1828) Impossible to output data to an HBase table
[ https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504721#comment-14504721 ] ASF GitHub Bot commented on FLINK-1828: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r28765691 --- Diff: flink-staging/flink-hbase/src/test/resources/hbase-site.xml --- @@ -22,14 +22,13 @@ -- configuration +!-- --- End diff -- Are these mandatory parameters to use HBase? Otherwise, we should remove them. Impossible to output data to an HBase table --- Key: FLINK-1828 URL: https://issues.apache.org/jira/browse/FLINK-1828 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9 Reporter: Flavio Pompermaier Labels: hadoop, hbase Fix For: 0.9 Right now it is not possible to use HBase TableOutputFormat as output format because Configurable.setConf is not called in the configure() method of the HadoopOutputFormatBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1875] Add figure explaining slots and p...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/604#discussion_r28758925 --- Diff: docs/config.md --- @@ -370,8 +370,7 @@ system, such as */tmp* in Linux systems. ### Configuring TaskManager processing slots -A processing slot allows Flink to execute a distributed DataSet transformation, such as a -data source or a map-transformation. +A processing slot allows Flink to execute an instance of a job. --- End diff -- I think the term *instance of a job* might be misleading. The document uses *operator instance* or *(user) function instance* but IMO it is not clear what an *instance of a job* is. I agree that the previous description wasn't good either, but we might try to come up with something better. What about Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1875) Add figure to documentation describing slots and parallelism
[ https://issues.apache.org/jira/browse/FLINK-1875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504605#comment-14504605 ] ASF GitHub Bot commented on FLINK-1875: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/604#issuecomment-94702358 Nice figure! I added a comment about a certain term in the documentation, but otherwise I think we should get it in. Add figure to documentation describing slots and parallelism Key: FLINK-1875 URL: https://issues.apache.org/jira/browse/FLINK-1875 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger Our users are still confused how parallelism and slots are connected to each other. We tried addressing this issue already with FLINK-1679, but I think we also need to have a nice picture in our documentation. This is too complicated: http://ci.apache.org/projects/flink/flink-docs-master/internal_job_scheduling.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1875) Add figure to documentation describing slots and parallelism
[ https://issues.apache.org/jira/browse/FLINK-1875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504600#comment-14504600 ] ASF GitHub Bot commented on FLINK-1875: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/604#discussion_r28758925 --- Diff: docs/config.md --- @@ -370,8 +370,7 @@ system, such as */tmp* in Linux systems. ### Configuring TaskManager processing slots -A processing slot allows Flink to execute a distributed DataSet transformation, such as a -data source or a map-transformation. +A processing slot allows Flink to execute an instance of a job. --- End diff -- I think the term *instance of a job* might be misleading. The document uses *operator instance* or *(user) function instance* but IMO it is not clear what an *instance of a job* is. I agree that the previous description wasn't good either, but we might try to come up with something better. What about Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.? Add figure to documentation describing slots and parallelism Key: FLINK-1875 URL: https://issues.apache.org/jira/browse/FLINK-1875 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger Our users are still confused how parallelism and slots are connected to each other. We tried addressing this issue already with FLINK-1679, but I think we also need to have a nice picture in our documentation. This is too complicated: http://ci.apache.org/projects/flink/flink-docs-master/internal_job_scheduling.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1889) Create optimization framework
[ https://issues.apache.org/jira/browse/FLINK-1889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504673#comment-14504673 ] Theodore Vasiloudis commented on FLINK-1889: WIP PR opened: https://github.com/apache/flink/pull/613 Create optimization framework - Key: FLINK-1889 URL: https://issues.apache.org/jira/browse/FLINK-1889 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML In order to implement Stochastic Gradient Descent and other optimization algorithms, we need an interface structure that the algorithms should comply to. We can then use that structure to implement the various algorithms. The purpose of this issue is to act as a root for the specific implementation of the optimization algorithms, and to discuss the design of the optimization package. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1745) Add k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504671#comment-14504671 ] Raghav Chalapathy commented on FLINK-1745: -- Hi Chiwan, Till I totally agree with Chiwan and Till's idea to make it a generic trait Distance measure; to support various distance metrics such as Euclidean, Manhattan etc Going by the literature shared in Approach 1 : https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf We must start of with the basic : Exact approaches HBNLJ, H-BRJ, ( Compare the cost ) Approximate approach : zkNN As a part of implementation homework I was going through some of the implementations and stumbled accross this one https://github.com/codeAshu/SparkAlgorithms/tree/master/mllib/src/main/scala/org/sparkalgos/mllib/join Issue has be considered here : https://issues.apache.org/jira/browse/SPARK-2335 Approach 2: The paper they have referred to is : http://ieeexplore.ieee.org/xpl/login.jsp?tp=arnumber=5447837tag=1url=http%3A%2F%2Fieeexplore.ieee.org%2Fxpls%2Fabs_all.jsp%3Farnumber%3D5447837%26tag%3D1 My Question is : Have we done a comparision of Approach 1 and Approach 2 ? Should we perform a comparision study going forward ? Raghav Add k-nearest-neighbours algorithm to machine learning library -- Key: FLINK-1745 URL: https://issues.apache.org/jira/browse/FLINK-1745 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Chiwan Park Labels: ML, Starter Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial it is still used as a mean to classify data and to do regression. Could be a starter task. Resources: [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1820] CSVReader: In case of an empty st...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/566#issuecomment-94722728 It would also be good to extend the respective parser tests such as `IntParserTest` when changing the behavior and semantics of the parsers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1820) Bug in DoubleParser and FloatParser - empty String is not casted to 0
[ https://issues.apache.org/jira/browse/FLINK-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504690#comment-14504690 ] ASF GitHub Bot commented on FLINK-1820: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/566#issuecomment-94722728 It would also be good to extend the respective parser tests such as `IntParserTest` when changing the behavior and semantics of the parsers. Bug in DoubleParser and FloatParser - empty String is not casted to 0 - Key: FLINK-1820 URL: https://issues.apache.org/jira/browse/FLINK-1820 Project: Flink Issue Type: Bug Components: Core Affects Versions: 0.8.0, 0.9, 0.8.1 Reporter: Felix Neutatz Assignee: Felix Neutatz Priority: Critical Fix For: 0.9 Hi, I found the bug, when I wanted to read a csv file, which had a line like: ||\n If I treat it as a Tuple2Long,Long, I get as expected a tuple (0L,0L). But if I want to read it into a Double-Tuple or a Float-Tuple, I get the following error: java.lang.AssertionError: Test failed due to a org.apache.flink.api.common.io.ParseException: Line could not be parsed: '||' ParserError NUMERIC_VALUE_FORMAT_ERROR This error can be solved by adding an additional condition for empty strings in the FloatParser / DoubleParser. We definitely need the CSVReader to be able to read empty values. I can fix it like described if there are no better ideas :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28785218 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -138,69 +146,46 @@ public void setInput(DataSetVertexVertexKey, VertexValue inputData) { if (this.initialVertices == null) { --- End diff -- I made that division in order to avoid having duplicate code: the number of vertices and the direction are totally independent of the degree option which is why they can be set in the createResult() method. Afterwards, the code does exactly what you described in this comment: it separates the creation of a delta iteration and the creation of the messaging function plus vertex update function according to the vertex type(with degrees or not). It's not just the vertex that changes, but everything that uses its value afterwards changes too. I suggest you look a bit closer at the createResultVerticesWithDegrees and createResultSimpleVertex methods. I don't think their functionality can be simplified by just creating a simple vertex and a vertex with degrees. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1523) Vertex-centric iteration extensions
[ https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505053#comment-14505053 ] ASF GitHub Bot commented on FLINK-1523: --- Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28785218 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -138,69 +146,46 @@ public void setInput(DataSetVertexVertexKey, VertexValue inputData) { if (this.initialVertices == null) { --- End diff -- I made that division in order to avoid having duplicate code: the number of vertices and the direction are totally independent of the degree option which is why they can be set in the createResult() method. Afterwards, the code does exactly what you described in this comment: it separates the creation of a delta iteration and the creation of the messaging function plus vertex update function according to the vertex type(with degrees or not). It's not just the vertex that changes, but everything that uses its value afterwards changes too. I suggest you look a bit closer at the createResultVerticesWithDegrees and createResultSimpleVertex methods. I don't think their functionality can be simplified by just creating a simple vertex and a vertex with degrees. Vertex-centric iteration extensions --- Key: FLINK-1523 URL: https://issues.apache.org/jira/browse/FLINK-1523 Project: Flink Issue Type: Improvement Components: Gelly Reporter: Vasia Kalavri Assignee: Andra Lungu We would like to make the following extensions to the vertex-centric iterations of Gelly: - allow vertices to access their in/out degrees and the total number of vertices of the graph, inside the iteration. - allow choosing the neighborhood type (in/out/all) over which to run the vertex-centric iteration. Now, the model uses the updates of the in-neighbors to calculate state and send messages to out-neighbors. We could add a parameter with value in/out/all to the {{VertexUpdateFunction}} and {{MessagingFunction}}, that would indicate the type of neighborhood. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results
[ https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505108#comment-14505108 ] ASF GitHub Bot commented on FLINK-1297: --- Github user tammymendt commented on a diff in the pull request: https://github.com/apache/flink/pull/605#discussion_r28789034 --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.operatorstatistics; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.Random; + +public class OperatorStatsAccumulatorsTest extends AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class); + + private static final String ACCUMULATOR_NAME = op-stats; + + public OperatorStatsAccumulatorsTest(){ + super(new Configuration()); + } + + @Test + public void testAccumulator() throws Exception { + + String input = ; + + Random rand = new Random(); + + for (int i = 1; i 1000; i++) { + if(rand.nextDouble()0.2){ + input+=String.valueOf(rand.nextInt(5))+\n; + }else{ + input+=String.valueOf(rand.nextInt(100))+\n; + } + } + + String inputFile = createTempFile(datapoints.txt, input); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + env.readTextFile(inputFile). + flatMap(new StringToInt()). + output(new DiscardingOutputFormatTuple1Integer()); + + JobExecutionResult result = env.execute(); + + OperatorStatistics globalStats = result.getAccumulatorResult(ACCUMULATOR_NAME); + LOG.debug(Global Stats); + LOG.debug(globalStats.toString()); + + OperatorStatistics merged = null; + + MapString,Object accResults = result.getAllAccumulatorResults(); + for (String accumulatorName:accResults.keySet()){ + if (accumulatorName.contains(ACCUMULATOR_NAME+-)){ + OperatorStatistics localStats = (OperatorStatistics) accResults.get(accumulatorName); + if (merged == null){ + merged = localStats.clone(); + }else { + merged.merge(localStats); + } + LOG.debug(Local Stats: + accumulatorName); + LOG.debug(localStats.toString()); + } + } + + Assert.assertEquals(globalStats.cardinality,999); + Assert.assertEquals(globalStats.estimateCountDistinct(),100); + Assert.assertTrue(globalStats.getHeavyHitters().size()0 globalStats.getHeavyHitters().size()=5); + Assert.assertEquals(merged.getMin(),globalStats.getMin()); +
[jira] [Resolved] (FLINK-671) Python interface for new API (Map/Reduce)
[ https://issues.apache.org/jira/browse/FLINK-671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek resolved FLINK-671. Resolution: Fixed Resolved in https://github.com/apache/flink/commit/d182daa19eeb1d3877821d4a8b9a37311678a12f Python interface for new API (Map/Reduce) - Key: FLINK-671 URL: https://issues.apache.org/jira/browse/FLINK-671 Project: Flink Issue Type: New Feature Components: Python API Reporter: Chesnay Schepler Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache Attachments: pull-request-671-9139035883911146960.patch ([#615|https://github.com/stratosphere/stratosphere/issues/615] | [FLINK-615|https://issues.apache.org/jira/browse/FLINK-615]) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/pull/671 Created by: [zentol|https://github.com/zentol] Labels: enhancement, java api, Milestone: Release 0.6 (unplanned) Created at: Wed Apr 09 20:52:06 CEST 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...
Github user tammymendt commented on a diff in the pull request: https://github.com/apache/flink/pull/605#discussion_r28789034 --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.operatorstatistics; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.Random; + +public class OperatorStatsAccumulatorsTest extends AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class); + + private static final String ACCUMULATOR_NAME = op-stats; + + public OperatorStatsAccumulatorsTest(){ + super(new Configuration()); + } + + @Test + public void testAccumulator() throws Exception { + + String input = ; + + Random rand = new Random(); + + for (int i = 1; i 1000; i++) { + if(rand.nextDouble()0.2){ + input+=String.valueOf(rand.nextInt(5))+\n; + }else{ + input+=String.valueOf(rand.nextInt(100))+\n; + } + } + + String inputFile = createTempFile(datapoints.txt, input); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + env.readTextFile(inputFile). + flatMap(new StringToInt()). + output(new DiscardingOutputFormatTuple1Integer()); + + JobExecutionResult result = env.execute(); + + OperatorStatistics globalStats = result.getAccumulatorResult(ACCUMULATOR_NAME); + LOG.debug(Global Stats); + LOG.debug(globalStats.toString()); + + OperatorStatistics merged = null; + + MapString,Object accResults = result.getAllAccumulatorResults(); + for (String accumulatorName:accResults.keySet()){ + if (accumulatorName.contains(ACCUMULATOR_NAME+-)){ + OperatorStatistics localStats = (OperatorStatistics) accResults.get(accumulatorName); + if (merged == null){ + merged = localStats.clone(); + }else { + merged.merge(localStats); + } + LOG.debug(Local Stats: + accumulatorName); + LOG.debug(localStats.toString()); + } + } + + Assert.assertEquals(globalStats.cardinality,999); + Assert.assertEquals(globalStats.estimateCountDistinct(),100); + Assert.assertTrue(globalStats.getHeavyHitters().size()0 globalStats.getHeavyHitters().size()=5); + Assert.assertEquals(merged.getMin(),globalStats.getMin()); + Assert.assertEquals(merged.getMax(),globalStats.getMax()); + Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct()); + Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size()); + + }
[jira] [Commented] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504975#comment-14504975 ] ASF GitHub Bot commented on FLINK-377: -- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94799438 Indeed! Very happy to have this in :-) Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek resolved FLINK-377. Resolution: Fixed Resolved in https://github.com/apache/flink/commit/af9248c35a5a138d311073b54f6abd4260ab7fd9 Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1398] Introduce extractSingleField() in...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/308#issuecomment-94792842 How do we proceed with this PR? I think it looks good and would be OK with adding this feature to a `DataSetUtils` class. Other opinions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-1919) Add HCatOutputFormat for Tuple data types
Fabian Hueske created FLINK-1919: Summary: Add HCatOutputFormat for Tuple data types Key: FLINK-1919 URL: https://issues.apache.org/jira/browse/FLINK-1919 Project: Flink Issue Type: New Feature Components: Java API, Scala API Reporter: Fabian Hueske Priority: Minor It would be good to have an OutputFormat that can write data to HCatalog tables. The Hadoop `HCatOutputFormat` expects `HCatRecord` objects and writes these to HCatalog tables. We can do the same thing, by creating these `HCatRecord` object with a Map function that precedes a `HadoopOutputFormat` that wraps the Hadoop `HCatOutputFormat`. Better support for Flink Tuples can be added by implementing a custom `HCatOutputFormat` that also depends on the Hadoop `HCatOutputFormat` but internally converts Flink Tuples to `HCatRecords`. This would also include to check if the schema of the HCatalog table and the Flink tuples match. For data types other than tuples, the OutputFormat could either require a preceding Map function that converts to `HCatRecords` or let users specify a MapFunction and invoke that internally. We have already a Flink `HCatInputFormat` which does this in the reverse directions, i.e., it emits Flink Tuples from HCatalog tables. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/612#issuecomment-94799019 Any thoughts on this? I would really like to merge this to improve Travis reliability. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1799) Scala API does not support generic arrays
[ https://issues.apache.org/jira/browse/FLINK-1799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504974#comment-14504974 ] ASF GitHub Bot commented on FLINK-1799: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/582#issuecomment-94799103 So, any thoughts about merging this? Scala API does not support generic arrays - Key: FLINK-1799 URL: https://issues.apache.org/jira/browse/FLINK-1799 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Assignee: Aljoscha Krettek The Scala API does not support generic arrays at the moment. It throws a rather unhelpful error message ```InvalidTypesException: The given type is not a valid object array```. Code to reproduce the problem is given below: {code} def main(args: Array[String]) { foobar[Double] } def foobar[T: ClassTag: TypeInformation]: DataSet[Block[T]] = { val tpe = createTypeInformation[Array[T]] null } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505001#comment-14505001 ] ASF GitHub Bot commented on FLINK-377: -- Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94809876 W00t! Nice to see this one gets in Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1820] CSVReader: In case of an empty st...
Github user FelixNeutatz commented on the pull request: https://github.com/apache/flink/pull/566#issuecomment-94810290 @fhueske: I agree on that :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1820) Bug in DoubleParser and FloatParser - empty String is not casted to 0
[ https://issues.apache.org/jira/browse/FLINK-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505005#comment-14505005 ] ASF GitHub Bot commented on FLINK-1820: --- Github user FelixNeutatz commented on the pull request: https://github.com/apache/flink/pull/566#issuecomment-94810290 @fhueske: I agree on that :) Bug in DoubleParser and FloatParser - empty String is not casted to 0 - Key: FLINK-1820 URL: https://issues.apache.org/jira/browse/FLINK-1820 Project: Flink Issue Type: Bug Components: Core Affects Versions: 0.8.0, 0.9, 0.8.1 Reporter: Felix Neutatz Assignee: Felix Neutatz Priority: Critical Fix For: 0.9 Hi, I found the bug, when I wanted to read a csv file, which had a line like: ||\n If I treat it as a Tuple2Long,Long, I get as expected a tuple (0L,0L). But if I want to read it into a Double-Tuple or a Float-Tuple, I get the following error: java.lang.AssertionError: Test failed due to a org.apache.flink.api.common.io.ParseException: Line could not be parsed: '||' ParserError NUMERIC_VALUE_FORMAT_ERROR This error can be solved by adding an additional condition for empty strings in the FloatParser / DoubleParser. We definitely need the CSVReader to be able to read empty values. I can fix it like described if there are no better ideas :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1907) Scala Interactive Shell
[ https://issues.apache.org/jira/browse/FLINK-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504965#comment-14504965 ] Aljoscha Krettek commented on FLINK-1907: - The problem is now, that the LocalExecutor does not allow setting a custom jar file like the RemoteExecutor does. Scala Interactive Shell --- Key: FLINK-1907 URL: https://issues.apache.org/jira/browse/FLINK-1907 Project: Flink Issue Type: New Feature Components: Scala API Reporter: Nikolaas Steenbergen Assignee: Nikolaas Steenbergen Priority: Minor Build an interactive Shell for the Scala api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504967#comment-14504967 ] ASF GitHub Bot commented on FLINK-377: -- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/202 Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/202 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94799438 Indeed! Very happy to have this in :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94809876 W00t! Nice to see this one gets in --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94798580 Congraluations! :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94798439 I merged it. :smile: Thanks a lot @zentol for staying with this for so long. Great work! P.S. Could you please close this PR, I always forget adding the closes #... message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/582#issuecomment-94799103 So, any thoughts about merging this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1867) TaskManagerFailureRecoveryITCase causes stalled travis builds
[ https://issues.apache.org/jira/browse/FLINK-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504971#comment-14504971 ] ASF GitHub Bot commented on FLINK-1867: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/612#issuecomment-94799019 Any thoughts on this? I would really like to merge this to improve Travis reliability. TaskManagerFailureRecoveryITCase causes stalled travis builds - Key: FLINK-1867 URL: https://issues.apache.org/jira/browse/FLINK-1867 Project: Flink Issue Type: Bug Components: TaskManager, Tests Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Aljoscha Krettek There are currently tests on travis failing: https://travis-ci.org/apache/flink/jobs/57943063 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504834#comment-14504834 ] ASF GitHub Bot commented on FLINK-377: -- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94760254 @rmetzger Done. Unless you want me to merge commits as well. Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94761408 Thanks a lot. Can you squash the commits after mingliang's example into one commit, prefixed with FLINK-671 ? Then we'll have 4 commits for the change, which is totally okay given its size. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504857#comment-14504857 ] ASF GitHub Bot commented on FLINK-377: -- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94771487 nah I'll do it. Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94771487 nah I'll do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504858#comment-14504858 ] ASF GitHub Bot commented on FLINK-377: -- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94774209 Done Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1917) EOFException when running delta-iteration job
[ https://issues.apache.org/jira/browse/FLINK-1917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Bunk closed FLINK-1917. -- Resolution: Duplicate EOFException when running delta-iteration job - Key: FLINK-1917 URL: https://issues.apache.org/jira/browse/FLINK-1917 Project: Flink Issue Type: Bug Components: Core, Distributed Runtime, Iterations Environment: 0.9-milestone-1 Exception on the cluster, local execution works Reporter: Stefan Bunk The delta-iteration program in [1] ends with an java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) For logs and the accompanying mailing list discussion see below. When running with slightly different memory configuration, as hinted on the mailing list, I sometimes also get this exception: 19.Apr. 13:39:29 INFO Task - IterationHead(WorksetIteration (Resolved-Redirects)) (10/10) switched to FAILED : java.lang.IndexOutOfBoundsException: Index: 161, Size: 161 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) [1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57 [2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4 [3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc [4] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1890) Add note to docs that ReadFields annotations are currently not evaluated
[ https://issues.apache.org/jira/browse/FLINK-1890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-1890: - Summary: Add note to docs that ReadFields annotations are currently not evaluated (was: Add withReadFields or sth. similar to Scala API) Add note to docs that ReadFields annotations are currently not evaluated Key: FLINK-1890 URL: https://issues.apache.org/jira/browse/FLINK-1890 Project: Flink Issue Type: Wish Components: Java API, Scala API Reporter: Stefan Bunk Priority: Minor In the Scala API, you have the option to declare forwarded fields via the {{withForwardedFields}} method. It would be nice to have sth. similar for read fields, as otherwise one needs to create a class, which I personally try to avoid for readability. Maybe grouping all annotations in one function and have a first parameter indicating the type of annotation is also an option, if you plan on adding more annotations and want to keep the interface smaller. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94760254 @rmetzger Done. Unless you want me to merge commits as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504839#comment-14504839 ] ASF GitHub Bot commented on FLINK-377: -- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94761465 I can also do the squashing if you want ;) Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-377) Create a general purpose framework for language bindings
[ https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504838#comment-14504838 ] ASF GitHub Bot commented on FLINK-377: -- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94761408 Thanks a lot. Can you squash the commits after mingliang's example into one commit, prefixed with FLINK-671 ? Then we'll have 4 commits for the change, which is totally okay given its size. Create a general purpose framework for language bindings Key: FLINK-377 URL: https://issues.apache.org/jira/browse/FLINK-377 Project: Flink Issue Type: Improvement Reporter: GitHub Import Assignee: Chesnay Schepler Labels: github-import Fix For: pre-apache A general purpose API to run operators with arbitrary binaries. This will allow to run Stratosphere programs written in Python, JavaScript, Ruby, Go or whatever you like. We suggest using Google Protocol Buffers for data serialization. This is the list of languages that currently support ProtoBuf: https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns Very early prototype with python: https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing protobuf) For Ruby: https://github.com/infochimps-labs/wukong Two new students working at Stratosphere (@skunert and @filiphaase) are working on this. The reference binding language will be for Python, but other bindings are very welcome. The best name for this so far is stratosphere-lang-bindings. I created this issue to track the progress (and give everybody a chance to comment on this) Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/377 Created by: [rmetzger|https://github.com/rmetzger] Labels: enhancement, Assignee: [filiphaase|https://github.com/filiphaase] Created at: Tue Jan 07 19:47:20 CET 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94761465 I can also do the squashing if you want ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94774209 Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-1681) Remove the old Record API
[ https://issues.apache.org/jira/browse/FLINK-1681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Saputra reassigned FLINK-1681: Assignee: Henry Saputra Remove the old Record API - Key: FLINK-1681 URL: https://issues.apache.org/jira/browse/FLINK-1681 Project: Flink Issue Type: Task Affects Versions: 0.8.1 Reporter: Henry Saputra Assignee: Henry Saputra Per discussion in dev@ list from FLINK-1106 issue, this time would like to remove the old APIs since we already deprecate them in 0.8.x release. This would help make the code base cleaner and easier for new contributors to navigate the source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1908) JobManager startup delay isn't considered when using start-cluster.sh script
[ https://issues.apache.org/jira/browse/FLINK-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505600#comment-14505600 ] ASF GitHub Bot commented on FLINK-1908: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/609#issuecomment-94916699 Forwarding comments from JIRA: I think @DarkKnightCZ is using versiob 0.8.x and Till Rohrmann is talking about 0.9 The startup is handled very differently in 0.9 and should actually fix the issue. The selection of the communication interface is in a backoff loop and should happen for many minutes before the TaskManager falls back to heuristics. I don't think that this issue will be fixed in 0.8.x. @DarkKnightCZ Can you verify whether 0.9 works for you? JobManager startup delay isn't considered when using start-cluster.sh script Key: FLINK-1908 URL: https://issues.apache.org/jira/browse/FLINK-1908 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.9, 0.8.1 Environment: Linux Reporter: Lukas Raska Priority: Minor Original Estimate: 5m Remaining Estimate: 5m When starting Flink cluster via start-cluster.sh script, JobManager startup can be delayed (as it's started asynchronously), which can result in failed startup of several task managers. Solution is to wait certain amount of time and periodically check if RPC port is accessible, then proceed with starting task managers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1799) Scala API does not support generic arrays
[ https://issues.apache.org/jira/browse/FLINK-1799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505621#comment-14505621 ] ASF GitHub Bot commented on FLINK-1799: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/582#issuecomment-94919453 Yo, looks fine now Scala API does not support generic arrays - Key: FLINK-1799 URL: https://issues.apache.org/jira/browse/FLINK-1799 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Assignee: Aljoscha Krettek The Scala API does not support generic arrays at the moment. It throws a rather unhelpful error message ```InvalidTypesException: The given type is not a valid object array```. Code to reproduce the problem is given below: {code} def main(args: Array[String]) { foobar[Double] } def foobar[T: ClassTag: TypeInformation]: DataSet[Block[T]] = { val tpe = createTypeInformation[Array[T]] null } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28814484 --- Diff: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java --- @@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() { Tuple2String, Object localAddress = new Tuple2String, Object(localhost, jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms); --- End diff -- Is this really a good idea? Heartbeats every 1 ms? This is almost longer than the TCP process-to-process roundtrip time. May be a bit too aggressive... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1908] JobManager startup delay isn't co...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/609#issuecomment-94916699 Forwarding comments from JIRA: I think @DarkKnightCZ is using versiob 0.8.x and Till Rohrmann is talking about 0.9 The startup is handled very differently in 0.9 and should actually fix the issue. The selection of the communication interface is in a backoff loop and should happen for many minutes before the TaskManager falls back to heuristics. I don't think that this issue will be fixed in 0.8.x. @DarkKnightCZ Can you verify whether 0.9 works for you? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1867) TaskManagerFailureRecoveryITCase causes stalled travis builds
[ https://issues.apache.org/jira/browse/FLINK-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505592#comment-14505592 ] ASF GitHub Bot commented on FLINK-1867: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28814484 --- Diff: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java --- @@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() { Tuple2String, Object localAddress = new Tuple2String, Object(localhost, jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms); --- End diff -- Is this really a good idea? Heartbeats every 1 ms? This is almost longer than the TCP process-to-process roundtrip time. May be a bit too aggressive... TaskManagerFailureRecoveryITCase causes stalled travis builds - Key: FLINK-1867 URL: https://issues.apache.org/jira/browse/FLINK-1867 Project: Flink Issue Type: Bug Components: TaskManager, Tests Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Aljoscha Krettek There are currently tests on travis failing: https://travis-ci.org/apache/flink/jobs/57943063 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-94915061 Hi @vasia , I added some answers to your inline comments! I will push my latest version for this tomorrow. Regarding the suggestion that the Vertex class might not be the best place for the getDegree methods, there is a reason why implementing this took a while ^^. I wanted to make the degrees available only in the iteration. The problem is that with the current code(this one and the one in production), a vertex is only accessible in the updateVertex() method. This means that there is no way to get the vertex within the iteration, you don't have a Vertex object, it's not something you send to the class. That was the quick workaround I found for this issue. If you know a better way, I am eager to hear your suggestions. Thanks! :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28814178 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala --- @@ -231,9 +231,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1000 ms) -config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4000 ms) +config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s) --- End diff -- What is the effect of this setting on the test? Does it take very long now, because the JobManager needs really long to determine that the TaskManager has failed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1804) flink-quickstart-scala tests fail on scala-2.11 build profile on travis
[ https://issues.apache.org/jira/browse/FLINK-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505645#comment-14505645 ] Stephan Ewen commented on FLINK-1804: - Do these consistently fail, or was that one spurious failure? flink-quickstart-scala tests fail on scala-2.11 build profile on travis --- Key: FLINK-1804 URL: https://issues.apache.org/jira/browse/FLINK-1804 Project: Flink Issue Type: Task Components: Build System, Quickstarts Affects Versions: 0.9 Reporter: Robert Metzger Travis builds on master started failing after the Scala 2.11 profile has been added to Flink. For example: https://travis-ci.org/apache/flink/jobs/56312734 The error: {code} [INFO] [INFO] --- scala-maven-plugin:3.1.4:compile (default) @ testArtifact --- [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype-apache [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype-apache [INFO] [WARNING] Expected all dependencies to require Scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill-avro_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill-bijection_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:bijection-core_2.10:0.7.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:bijection-avro_2.10:0.7.2 requires scala version: 2.10.4 [INFO] [WARNING] org.scala-lang:scala-reflect:2.10.4 requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-scala:0.9-SNAPSHOT requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-scala:0.9-SNAPSHOT requires scala version: 2.10.4 [INFO] [WARNING] org.scala-lang:scala-compiler:2.10.4 requires scala version: 2.10.4 [INFO] [WARNING] org.scalamacros:quasiquotes_2.10:2.0.1 requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-streaming-scala:0.9-SNAPSHOT requires scala version: 2.11.4 [INFO] [WARNING] Multiple versions of scala libraries detected! [INFO] [INFO] /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/src/main/scala:-1: info: compiling [INFO] [INFO] Compiling 3 source files to /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/target/classes at 1427650524446 [INFO] [ERROR] error: [INFO] [INFO] while compiling: /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/src/main/scala/org/apache/flink/archetypetest/SocketTextStreamWordCount.scala [INFO] [INFO] during phase: typer [INFO] [INFO] library version: version 2.10.4 [INFO] [INFO] compiler version: version 2.10.4 [INFO] [INFO] reconstructed args: -d /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/target/classes -classpath