[jira] [Commented] (FLINK-2189) NullPointerException in MutableHashTable
[ https://issues.apache.org/jira/browse/FLINK-2189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712584#comment-14712584 ] Chiwan Park commented on FLINK-2189: When the bloom filter is used, I face {{NegativeArraySizeException}} like FLINK-2545. I tested with full movie-lens data set in my local machine with 256MB memory per task manager. > NullPointerException in MutableHashTable > > > Key: FLINK-2189 > URL: https://issues.apache.org/jira/browse/FLINK-2189 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: Till Rohrmann > > [~Felix Neutatz] reported a {{NullPointerException}} in the > {{MutableHashTable}} when running the {{ALS}} algorithm. The stack trace is > the following: > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1094) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:927) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:783) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > at > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > {code} > He produced this error on his local machine with the following code: > {code} > implicit val env = ExecutionEnvironment.getExecutionEnvironment > val links = MovieLensUtils.readLinks(movieLensDir + "links.csv") > val movies = MovieLensUtils.readMovies(movieLensDir + "movies.csv") > val ratings = MovieLensUtils.readRatings(movieLensDir + "ratings.csv") > val tags = MovieLensUtils.readTags(movieLensDir + "tags.csv") > > val ratingMatrix = ratings.map { r => (r.userId.toInt, r.movieId.toInt, > r.rating) } > val testMatrix = ratings.map { r => (r.userId.toInt, r.movieId.toInt) } > val als = ALS() >.setIterations(10) >.setNumFactors(10) >.setBlocks(150) > > als.fit(ratingMatrix) > val result = als.predict(testMatrix) > > result.print > val risk = als.empiricalRisk(ratingMatrix).collect().apply(0) > println("Empirical risk: " + risk) > env.execute() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2569) CsvReader support for ValueTypes
[ https://issues.apache.org/jira/browse/FLINK-2569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712579#comment-14712579 ] ASF GitHub Bot commented on FLINK-2569: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1053#issuecomment-134853875 I fixed the test to use `collect` method. > CsvReader support for ValueTypes > > > Key: FLINK-2569 > URL: https://issues.apache.org/jira/browse/FLINK-2569 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Greg Hogan >Assignee: Chiwan Park >Priority: Minor > > From the Flink Programming Guide section on Data Sources: > {quote} > readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) > delimited fields. Returns a DataSet of tuples or POJOs. Supports the basic > java types and their Value counterparts as field types. > {quote} > When specifying a ValueType, i.e. > {code} > CsvReader csvReader = env.readCsvFile(filename); > csvReader.types(IntValue.class, IntValue.class); > {code} > the following error occurs as BasicTypeInfo is specifically requested in > CsvReader.types(...). > {code} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:327) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:608) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:296) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:927) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:977) > Caused by: java.lang.IllegalArgumentException: Type at position 0 is not a > basic type. > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.getBasicTupleTypeInfo(TupleTypeInfo.java:177) > at org.apache.flink.api.java.io.CsvReader.types(CsvReader.java:393) > at Driver.main(Driver.java:105) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > ... 6 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2569] [core] Add CsvReader support for ...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1053#issuecomment-134853875 I fixed the test to use `collect` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2574) Remove Spargel from master in next release
Henry Saputra created FLINK-2574: Summary: Remove Spargel from master in next release Key: FLINK-2574 URL: https://issues.apache.org/jira/browse/FLINK-2574 Project: Flink Issue Type: Task Components: Spargel Reporter: Henry Saputra Fix For: 0.10 With Gelly getting more mature and more people start using Flink, I propose to remove Spargel from master in next release. We already deprecate it in 0.9 so I think it is good time to remove it in favor of Gelly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2570) Add a Triangle Count Library Method
[ https://issues.apache.org/jira/browse/FLINK-2570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712566#comment-14712566 ] ASF GitHub Bot commented on FLINK-2570: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134847386 okay... > Add a Triangle Count Library Method > --- > > Key: FLINK-2570 > URL: https://issues.apache.org/jira/browse/FLINK-2570 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10 >Reporter: Andra Lungu >Assignee: Andra Lungu >Priority: Minor > > The Gather-Sum-Apply-Scatter version of this algorithm receives an undirected > graph as input and outputs the total number of triangles formed by the > graph's edges. > The implementation consists of three phases: > 1). Select neighbours with id greater than the current vertex id. > Gather: no-op > Sum: create a set out of these neighbours > Apply: attach the computed values to the vertices > 2). Propagate each received value to neighbours with higher id (again using > GSA) > 3). Compute the number of Triangles by verifying if the final vertex contains > the sender's id in its list. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134847386 okay... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134847237 I think you should wait a bit. There is an open discussion in the mailing list on whether to fork 0.9.1 out of the current master. If there is consensus and you merge this, then this will be one more change to revert (even though this is not API breaking, but it's new functionality, so I'm not really sure). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2570) Add a Triangle Count Library Method
[ https://issues.apache.org/jira/browse/FLINK-2570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712565#comment-14712565 ] ASF GitHub Bot commented on FLINK-2570: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134847237 I think you should wait a bit. There is an open discussion in the mailing list on whether to fork 0.9.1 out of the current master. If there is consensus and you merge this, then this will be one more change to revert (even though this is not API breaking, but it's new functionality, so I'm not really sure). > Add a Triangle Count Library Method > --- > > Key: FLINK-2570 > URL: https://issues.apache.org/jira/browse/FLINK-2570 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10 >Reporter: Andra Lungu >Assignee: Andra Lungu >Priority: Minor > > The Gather-Sum-Apply-Scatter version of this algorithm receives an undirected > graph as input and outputs the total number of triangles formed by the > graph's edges. > The implementation consists of three phases: > 1). Select neighbours with id greater than the current vertex id. > Gather: no-op > Sum: create a set out of these neighbours > Apply: attach the computed values to the vertices > 2). Propagate each received value to neighbours with higher id (again using > GSA) > 3). Compute the number of Triangles by verifying if the final vertex contains > the sender's id in its list. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2570) Add a Triangle Count Library Method
[ https://issues.apache.org/jira/browse/FLINK-2570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712555#comment-14712555 ] ASF GitHub Bot commented on FLINK-2570: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134845522 Great! I'll merge this. > Add a Triangle Count Library Method > --- > > Key: FLINK-2570 > URL: https://issues.apache.org/jira/browse/FLINK-2570 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10 >Reporter: Andra Lungu >Assignee: Andra Lungu >Priority: Minor > > The Gather-Sum-Apply-Scatter version of this algorithm receives an undirected > graph as input and outputs the total number of triangles formed by the > graph's edges. > The implementation consists of three phases: > 1). Select neighbours with id greater than the current vertex id. > Gather: no-op > Sum: create a set out of these neighbours > Apply: attach the computed values to the vertices > 2). Propagate each received value to neighbours with higher id (again using > GSA) > 3). Compute the number of Triangles by verifying if the final vertex contains > the sender's id in its list. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134845522 Great! I'll merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2569) CsvReader support for ValueTypes
[ https://issues.apache.org/jira/browse/FLINK-2569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712537#comment-14712537 ] ASF GitHub Bot commented on FLINK-2569: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1053#discussion_r37947758 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -123,6 +132,21 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; } + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + data.writeAsText(resultPath); --- End diff -- Okay, I'll update `CsvReaderITCase` to use `collect` instead of writing data to disk. > CsvReader support for ValueTypes > > > Key: FLINK-2569 > URL: https://issues.apache.org/jira/browse/FLINK-2569 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Greg Hogan >Assignee: Chiwan Park >Priority: Minor > > From the Flink Programming Guide section on Data Sources: > {quote} > readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) > delimited fields. Returns a DataSet of tuples or POJOs. Supports the basic > java types and their Value counterparts as field types. > {quote} > When specifying a ValueType, i.e. > {code} > CsvReader csvReader = env.readCsvFile(filename); > csvReader.types(IntValue.class, IntValue.class); > {code} > the following error occurs as BasicTypeInfo is specifically requested in > CsvReader.types(...). > {code} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:327) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:608) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:296) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:927) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:977) > Caused by: java.lang.IllegalArgumentException: Type at position 0 is not a > basic type. > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.getBasicTupleTypeInfo(TupleTypeInfo.java:177) > at org.apache.flink.api.java.io.CsvReader.types(CsvReader.java:393) > at Driver.main(Driver.java:105) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > ... 6 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2569] [core] Add CsvReader support for ...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1053#discussion_r37947758 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -123,6 +132,21 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; } + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + data.writeAsText(resultPath); --- End diff -- Okay, I'll update `CsvReaderITCase` to use `collect` instead of writing data to disk. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712460#comment-14712460 ] ASF GitHub Bot commented on FLINK-2536: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134822139 @tillrohrmann Hi, I fix the conflict and get the CI rerun. Would you please to take a look about my new changes? Whether there will be some new comments? > Add a retry for SocketClientSink > > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/1030#issuecomment-134822139 @tillrohrmann Hi, I fix the conflict and get the CI rerun. Would you please to take a look about my new changes? Whether there will be some new comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712385#comment-14712385 ] ASF GitHub Bot commented on FLINK-2490: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134797100 @mxm Hi,I chage the CONNECTION_RETRY_SLEEP to static final int CONNECTION_RETRY_SLEEP = 1000; But I have no idea to straightly changing the CONNECTION_RETRY_SLEEP in my test using: SocketTextStreamFunction.CONNECTION_RETRY_SLEEP = 200. So, I add a reflection mechanism to resolve this. And now the CONNECTION_RETRY_SLEEP changes to 200 in my test. Would you please to take a look whether it is correct? > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134797100 @mxm Hi,I chage the CONNECTION_RETRY_SLEEP to static final int CONNECTION_RETRY_SLEEP = 1000; But I have no idea to straightly changing the CONNECTION_RETRY_SLEEP in my test using: SocketTextStreamFunction.CONNECTION_RETRY_SLEEP = 200. So, I add a reflection mechanism to resolve this. And now the CONNECTION_RETRY_SLEEP changes to 200 in my test. Would you please to take a look whether it is correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api
[ https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712289#comment-14712289 ] ASF GitHub Bot commented on FLINK-2480: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/991#issuecomment-134781611 @StephanEwen Hi, Not yet. I will ask the travis support again. > Improving tests coverage for org.apache.flink.streaming.api > --- > > Key: FLINK-2480 > URL: https://issues.apache.org/jira/browse/FLINK-2480 > Project: Flink > Issue Type: Test > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 504h > Remaining Estimate: 504h > > The streaming API is quite a bit newer than the other code so it is not that > well covered with tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/991#issuecomment-134781611 @StephanEwen Hi, Not yet. I will ask the travis support again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37937505 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -42,11 +42,13 @@ private boolean retryForever; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; - private static final int CONNECTION_RETRY_SLEEP = 1000; + public static int CONNECTION_RETRY_SLEEP = 1000; --- End diff -- But if I add the final, it will be a error in my test: Cannot assign a value to final variable "CONNECTION_RETRY_SLEEP". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket
[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712278#comment-14712278 ] ASF GitHub Bot commented on FLINK-2490: --- Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37937505 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -42,11 +42,13 @@ private boolean retryForever; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; - private static final int CONNECTION_RETRY_SLEEP = 1000; + public static int CONNECTION_RETRY_SLEEP = 1000; --- End diff -- But if I add the final, it will be a error in my test: Cannot assign a value to final variable "CONNECTION_RETRY_SLEEP". > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > --- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei >Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2570) Add a Triangle Count Library Method
[ https://issues.apache.org/jira/browse/FLINK-2570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711865#comment-14711865 ] ASF GitHub Bot commented on FLINK-2570: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134720344 Thanks for the clarifications Andra! Everything looks good to me now. I would love to also see a PR of the other implementation. And if you have any insight on which one performs better when, then we should add this as a tip to the descriptions. > Add a Triangle Count Library Method > --- > > Key: FLINK-2570 > URL: https://issues.apache.org/jira/browse/FLINK-2570 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10 >Reporter: Andra Lungu >Assignee: Andra Lungu >Priority: Minor > > The Gather-Sum-Apply-Scatter version of this algorithm receives an undirected > graph as input and outputs the total number of triangles formed by the > graph's edges. > The implementation consists of three phases: > 1). Select neighbours with id greater than the current vertex id. > Gather: no-op > Sum: create a set out of these neighbours > Apply: attach the computed values to the vertices > 2). Propagate each received value to neighbours with higher id (again using > GSA) > 3). Compute the number of Triangles by verifying if the final vertex contains > the sender's id in its list. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134720344 Thanks for the clarifications Andra! Everything looks good to me now. I would love to also see a PR of the other implementation. And if you have any insight on which one performs better when, then we should add this as a tip to the descriptions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134702560 Hi @vasia, I clarified the type of input expected. The graph should be undirected. Without the distinct, you get duplicate edges there(and an erroneous number of triangles). The second bullet point is again not an issue because the graph is undirected. The result should be fine. For the SNAP data sets, I got a number equal to theirs on a cluster. Concerning the runtime, you are right, It's just true for some cases (generally faster by a factor of two) but it highly depends on the data set. So, once this gets merged, I'll go ahead and propose the vertex centric version as well. That way, the user can choose. Hope I clarified everything! Let me know if you still have questions :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2570) Add a Triangle Count Library Method
[ https://issues.apache.org/jira/browse/FLINK-2570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711790#comment-14711790 ] ASF GitHub Bot commented on FLINK-2570: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134702560 Hi @vasia, I clarified the type of input expected. The graph should be undirected. Without the distinct, you get duplicate edges there(and an erroneous number of triangles). The second bullet point is again not an issue because the graph is undirected. The result should be fine. For the SNAP data sets, I got a number equal to theirs on a cluster. Concerning the runtime, you are right, It's just true for some cases (generally faster by a factor of two) but it highly depends on the data set. So, once this gets merged, I'll go ahead and propose the vertex centric version as well. That way, the user can choose. Hope I clarified everything! Let me know if you still have questions :) > Add a Triangle Count Library Method > --- > > Key: FLINK-2570 > URL: https://issues.apache.org/jira/browse/FLINK-2570 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10 >Reporter: Andra Lungu >Assignee: Andra Lungu >Priority: Minor > > The Gather-Sum-Apply-Scatter version of this algorithm receives an undirected > graph as input and outputs the total number of triangles formed by the > graph's edges. > The implementation consists of three phases: > 1). Select neighbours with id greater than the current vertex id. > Gather: no-op > Sum: create a set out of these neighbours > Apply: attach the computed values to the vertices > 2). Propagate each received value to neighbours with higher id (again using > GSA) > 3). Compute the number of Triangles by verifying if the final vertex contains > the sender's id in its list. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2555] Properly pass security credential...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1038#issuecomment-134584589 I agree. Lets file a JIRA and do it separately, as this is probably a bigger task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1057#discussion_r37895597 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -127,7 +127,7 @@ FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib # The above lib path is used by the shell script to retrieve jars in a # directory, so it needs to be unmangled. FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"` -FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf +if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi --- End diff -- Yes, you don't need the else here because the variable is set either through the environment or in the if block. Still, I'd prefer newlines but it is maybe just a matter of taste here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1057#discussion_r37894520 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -127,7 +127,7 @@ FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib # The above lib path is used by the shell script to retrieve jars in a # directory, so it needs to be unmangled. FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"` -FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf +if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi --- End diff -- What would be the else block? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134639235 Looks good +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1057#issuecomment-134679865 Very useful feature. In addition, I could also imagine that the config file could be passed as a parameter to the ./bin/flink utility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1057#discussion_r37894302 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -127,7 +127,7 @@ FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib # The above lib path is used by the shell script to retrieve jars in a # directory, so it needs to be unmangled. FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"` -FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf +if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi --- End diff -- Maybe just code style but could you make this more explicit using if-else blocks? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs
[ https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711667#comment-14711667 ] Maximilian Michels commented on FLINK-2380: --- Shouldn't the default file system be read from the Hadoop configuration? That's much more convenient than another config entry. IMO the standard scenario is that you have an HDFS setup and set {{fs.default.name}} in the Hadoop config. It also defaults to file. > Allow to configure default FS for file inputs > - > > Key: FLINK-2380 > URL: https://issues.apache.org/jira/browse/FLINK-2380 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 0.9, master >Reporter: Ufuk Celebi >Priority: Minor > Fix For: 0.10, 0.9.1 > > > File inputs use "file://" as default prefix. A user asked to make this > configurable, e.g. "hdfs://" as default. > (I'm not sure whether this is already possible or not. I will check and > either close or implement this for the user.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2460] [runtime] Check parent state in i...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1051#issuecomment-134676740 Addressing the comment and merging this for 0.10 and 0.9.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1057 [hotfix] Allow setting FLINK_CONF_DIR by hand This makes it possible for users to set a per-job conf directory when using the one-flink-cluster-per-job mode on yarn. Which enables, for example, per job log settings. @uce This should probably also go into 0.9.1. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink flink-conf-dir Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1057.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1057 commit d42e7df0110adfa4702de2fc2e31c85e8ecc0c18 Author: Aljoscha Krettek Date: 2015-08-25T17:26:29Z [hotfix] Allow setting FLINK_CONF_DIR by hand --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2569] [core] Add CsvReader support for ...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1053#issuecomment-134584979 @fhueske Thanks for review. :) I addressed your comments. * Add `getBasicAndBasicValueTupleTypeInfo` method into `TupleTypeInfo` * Add `isBasicValueType` method into `ValueTypeInfo` class to check whether the type is basic value or not --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2569] [core] Add CsvReader support for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1053#discussion_r37886531 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -123,6 +132,21 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; } + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + data.writeAsText(resultPath); --- End diff -- That is true, but it might ease the problem a little bit if newly added tests try to use `collect`. And I doubt that we'll soon find somebody who will take care of this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2089) "Buffer recycled" IllegalStateException during cancelling
[ https://issues.apache.org/jira/browse/FLINK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711654#comment-14711654 ] ASF GitHub Bot commented on FLINK-2089: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1050#issuecomment-134676836 I will address the comment and merge this for 0.10 and 0.9.1. > "Buffer recycled" IllegalStateException during cancelling > - > > Key: FLINK-2089 > URL: https://issues.apache.org/jira/browse/FLINK-2089 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 0.10, 0.9.1 > > > [~rmetzger] reported the following stack trace during cancelling of high > parallelism jobs: > {code} > Error: java.lang.IllegalStateException: Buffer has already been recycled. > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.flink.runtime.io.network.buffer.Buffer.ensureNotRecycled(Buffer.java:142) > at > org.apache.flink.runtime.io.network.buffer.Buffer.getMemorySegment(Buffer.java:78) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.setNextBuffer(SpillingAdaptiveSpanningRecordDeserializer.java:72) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:80) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > {code} > This looks like a concurrent buffer pool release/buffer usage error. I'm > investing this today. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2089] [runtime] Fix illegal state in Re...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1050#issuecomment-134676836 I will address the comment and merge this for 0.10 and 0.9.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2460) ReduceOnNeighborsWithExceptionITCase failure
[ https://issues.apache.org/jira/browse/FLINK-2460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711653#comment-14711653 ] ASF GitHub Bot commented on FLINK-2460: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1051#issuecomment-134676740 Addressing the comment and merging this for 0.10 and 0.9.1. > ReduceOnNeighborsWithExceptionITCase failure > > > Key: FLINK-2460 > URL: https://issues.apache.org/jira/browse/FLINK-2460 > Project: Flink > Issue Type: Bug >Reporter: Sachin Goel >Assignee: Ufuk Celebi > > I noticed a build error due to failure on this case. It was on a branch of my > fork, which didn't actually have anything to do with the failed test or the > runtime system at all. > Here's the error log: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/73695554/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2380) Allow to configure default FS for file inputs
[ https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-2380: --- Fix Version/s: 0.9.1 > Allow to configure default FS for file inputs > - > > Key: FLINK-2380 > URL: https://issues.apache.org/jira/browse/FLINK-2380 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 0.9, master >Reporter: Ufuk Celebi >Priority: Minor > Fix For: 0.10, 0.9.1 > > > File inputs use "file://" as default prefix. A user asked to make this > configurable, e.g. "hdfs://" as default. > (I'm not sure whether this is already possible or not. I will check and > either close or implement this for the user.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs
[ https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711643#comment-14711643 ] Ufuk Celebi commented on FLINK-2380: I've talked to the user again and he confirmed that this is still considered inconvenient. I've looked into the FileSystem class and it looks like this would be an easy fix: - Add a configuration variable - In FileSystem#get(URI) if the URI has no scheme, use the configured scheme (keep default as file) Any objections to address this in 0.9.1? The user will be happy :) > Allow to configure default FS for file inputs > - > > Key: FLINK-2380 > URL: https://issues.apache.org/jira/browse/FLINK-2380 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 0.9, master >Reporter: Ufuk Celebi >Priority: Minor > Fix For: 0.10, 0.9.1 > > > File inputs use "file://" as default prefix. A user asked to make this > configurable, e.g. "hdfs://" as default. > (I'm not sure whether this is already possible or not. I will check and > either close or implement this for the user.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2429) Remove the "enableCheckpointing()" without interval variant
[ https://issues.apache.org/jira/browse/FLINK-2429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-2429: --- Fix Version/s: 0.10 > Remove the "enableCheckpointing()" without interval variant > --- > > Key: FLINK-2429 > URL: https://issues.apache.org/jira/browse/FLINK-2429 > Project: Flink > Issue Type: Wish > Components: Streaming >Reporter: Stephan Ewen >Priority: Minor > Fix For: 0.10 > > > I think it is not very obvious what the default checkpointing interval is. > Also, when somebody activates checkpointing, shouldn't they think about what > they want in terms of frequency and recovery latency tradeoffs? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2474) Occasional failures in PartitionedStateCheckpointingITCase
[ https://issues.apache.org/jira/browse/FLINK-2474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-2474: --- Labels: test-stability (was: ) > Occasional failures in PartitionedStateCheckpointingITCase > -- > > Key: FLINK-2474 > URL: https://issues.apache.org/jira/browse/FLINK-2474 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Márton Balassi > Labels: test-stability > > The error message > {code} > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 47.301 sec > <<< FAILURE! - in > org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase > runCheckpointedProgram(org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase) > Time elapsed: 42.495 sec <<< FAILURE! > java.lang.AssertionError: expected:<86678900> but was:<3467156> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.runCheckpointedProgram(PartitionedStateCheckpointingITCase.java:117) > {code} > The detailed CI logs > https://s3.amazonaws.com/archive.travis-ci.org/jobs/73928480/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2504) ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed spuriously
[ https://issues.apache.org/jira/browse/FLINK-2504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-2504: --- Labels: test-stability (was: ) > ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed > spuriously > - > > Key: FLINK-2504 > URL: https://issues.apache.org/jira/browse/FLINK-2504 > Project: Flink > Issue Type: Bug >Reporter: Till Rohrmann > Labels: test-stability > > The test > {{ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed}} > failed in one of my Travis builds: > https://travis-ci.org/tillrohrmann/flink/jobs/74881883 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2189) NullPointerException in MutableHashTable
[ https://issues.apache.org/jira/browse/FLINK-2189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711620#comment-14711620 ] Till Rohrmann commented on FLINK-2189: -- [~JonathanH5] encountered this problem recently. > NullPointerException in MutableHashTable > > > Key: FLINK-2189 > URL: https://issues.apache.org/jira/browse/FLINK-2189 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: Till Rohrmann > > [~Felix Neutatz] reported a {{NullPointerException}} in the > {{MutableHashTable}} when running the {{ALS}} algorithm. The stack trace is > the following: > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1094) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:927) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:783) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > at > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > {code} > He produced this error on his local machine with the following code: > {code} > implicit val env = ExecutionEnvironment.getExecutionEnvironment > val links = MovieLensUtils.readLinks(movieLensDir + "links.csv") > val movies = MovieLensUtils.readMovies(movieLensDir + "movies.csv") > val ratings = MovieLensUtils.readRatings(movieLensDir + "ratings.csv") > val tags = MovieLensUtils.readTags(movieLensDir + "tags.csv") > > val ratingMatrix = ratings.map { r => (r.userId.toInt, r.movieId.toInt, > r.rating) } > val testMatrix = ratings.map { r => (r.userId.toInt, r.movieId.toInt) } > val als = ALS() >.setIterations(10) >.setNumFactors(10) >.setBlocks(150) > > als.fit(ratingMatrix) > val result = als.predict(testMatrix) > > result.print > val risk = als.empiricalRisk(ratingMatrix).collect().apply(0) > println("Empirical risk: " + risk) > env.execute() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2276) Travis build error
[ https://issues.apache.org/jira/browse/FLINK-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-2276: --- Labels: test-stability (was: ) > Travis build error > -- > > Key: FLINK-2276 > URL: https://issues.apache.org/jira/browse/FLINK-2276 > Project: Flink > Issue Type: Bug >Reporter: Sachin Goel > Labels: test-stability > > testExecutionFailsAfterTaskMarkedFailed on travis. > Here is the log output: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/68288986/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2314] - Added Checkpointing to File Sou...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37883013 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java --- @@ -120,12 +131,24 @@ public void run(SourceContext ctx) throws Exception { OUT nextElement = serializer.createInstance(); nextElement = format.nextRecord(nextElement); if (nextElement == null && splitIterator.hasNext()) { - format.open(splitIterator.next()); + InputSplit split = splitIterator.next(); + splitNumber = split.getSplitNumber(); + currRecord = 0l; + format.open(split); continue; } else if (nextElement == null) { break; } - ctx.collect(nextElement); + if(splitNumber == checkpointedSplit){ --- End diff -- What if you've checkpointed the 2. split after seeing the 1. and 2. split and now the source is re-executed with the first split? Aren't records written again because you only save the latest checkpointed split number? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1158) Logging property files missing in project created by archetypes
[ https://issues.apache.org/jira/browse/FLINK-1158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711610#comment-14711610 ] Robert Metzger commented on FLINK-1158: --- Looks like I fixed this with https://github.com/apache/flink/commit/354efec0f9da0fa03ea9b337b02a1a2a03a9ac16 > Logging property files missing in project created by archetypes > --- > > Key: FLINK-1158 > URL: https://issues.apache.org/jira/browse/FLINK-1158 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 0.7.0-incubating >Reporter: Till Rohrmann > Fix For: 0.9 > > > If one creates a flink project using the archetypes, then there are no > predefined logging properties files. Would be very convenient for the user to > have them generated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-1158) Logging property files missing in project created by archetypes
[ https://issues.apache.org/jira/browse/FLINK-1158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-1158: - Assignee: Robert Metzger > Logging property files missing in project created by archetypes > --- > > Key: FLINK-1158 > URL: https://issues.apache.org/jira/browse/FLINK-1158 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 0.7.0-incubating >Reporter: Till Rohrmann >Assignee: Robert Metzger > Fix For: 0.9 > > > If one creates a flink project using the archetypes, then there are no > predefined logging properties files. Would be very convenient for the user to > have them generated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2189) NullPointerException in MutableHashTable
[ https://issues.apache.org/jira/browse/FLINK-2189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711608#comment-14711608 ] Ufuk Celebi commented on FLINK-2189: [~Felix Neutatz], can you still reproduce this error after the recent fix in 627f3cbcfdca8368eea6aa825cd9a45a9a0a841f? > NullPointerException in MutableHashTable > > > Key: FLINK-2189 > URL: https://issues.apache.org/jira/browse/FLINK-2189 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: Till Rohrmann > > [~Felix Neutatz] reported a {{NullPointerException}} in the > {{MutableHashTable}} when running the {{ALS}} algorithm. The stack trace is > the following: > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1094) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:927) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:783) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > at > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > {code} > He produced this error on his local machine with the following code: > {code} > implicit val env = ExecutionEnvironment.getExecutionEnvironment > val links = MovieLensUtils.readLinks(movieLensDir + "links.csv") > val movies = MovieLensUtils.readMovies(movieLensDir + "movies.csv") > val ratings = MovieLensUtils.readRatings(movieLensDir + "ratings.csv") > val tags = MovieLensUtils.readTags(movieLensDir + "tags.csv") > > val ratingMatrix = ratings.map { r => (r.userId.toInt, r.movieId.toInt, > r.rating) } > val testMatrix = ratings.map { r => (r.userId.toInt, r.movieId.toInt) } > val als = ALS() >.setIterations(10) >.setNumFactors(10) >.setBlocks(150) > > als.fit(ratingMatrix) > val result = als.predict(testMatrix) > > result.print > val risk = als.empiricalRisk(ratingMatrix).collect().apply(0) > println("Empirical risk: " + risk) > env.execute() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2200) Flink API with Scala 2.11 - Maven Repository
[ https://issues.apache.org/jira/browse/FLINK-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711607#comment-14711607 ] ASF GitHub Bot commented on FLINK-2200: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/885#issuecomment-134667459 There is a Spark 2.11 artifact in mvn central. I think they are doing a similar thing as we are already doing with the hadoop1/hadoop2 versions: They generate specific pom files when deploying Spark to maven central: https://github.com/apache/spark/blob/master/dev/change-scala-version.sh > Flink API with Scala 2.11 - Maven Repository > > > Key: FLINK-2200 > URL: https://issues.apache.org/jira/browse/FLINK-2200 > Project: Flink > Issue Type: Wish > Components: Build System, Scala API >Reporter: Philipp Götze >Assignee: Chiwan Park >Priority: Trivial > Labels: maven > > It would be nice if you could upload a pre-built version of the Flink API > with Scala 2.11 to the maven repository. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2200] Add Flink with Scala 2.11 in Mave...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/885#issuecomment-134667459 There is a Spark 2.11 artifact in mvn central. I think they are doing a similar thing as we are already doing with the hadoop1/hadoop2 versions: They generate specific pom files when deploying Spark to maven central: https://github.com/apache/spark/blob/master/dev/change-scala-version.sh --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1195) Improvement of benchmarking infrastructure
[ https://issues.apache.org/jira/browse/FLINK-1195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711604#comment-14711604 ] Till Rohrmann commented on FLINK-1195: -- I cannot really tell to what extent this PR is subsumed by [~mxm]'s testing infrastructure. But if that's the case, then this issue can be closed. > Improvement of benchmarking infrastructure > -- > > Key: FLINK-1195 > URL: https://issues.apache.org/jira/browse/FLINK-1195 > Project: Flink > Issue Type: Wish >Reporter: Till Rohrmann >Assignee: Alexander Alexandrov > > I noticed while running my ALS benchmarks that we still have some potential > to improve our benchmarking infrastructure. The current state is that we > execute the benchmark jobs by writing a script with a single set of > parameters. The runtime is then manually retrieved from the web interface of > Flink and Spark, respectively. > I think we need the following extensions: > * Automatic runtime retrieval and storage in a file > * Repeated execution of jobs to gather some "advanced" statistics such as > mean and standard deviation of the runtimes > * Support for value sets for the individual parameters > The automatic runtime retrieval would allow us to execute several benchmarks > consecutively without having to lookup the runtimes in the logs or in the web > interface, which btw only stores the runtimes of the last 5 jobs. > What I mean with value sets is that would be nice to specify a set of > parameter values for which the benchmark is run without having to write for > every single parameter combination a benchmark script. I believe that this > feature would become very handy when we want to look at the runtime behaviour > of Flink for different input sizes or degrees of parallelism, for example. To > illustrate what I mean: > {code} > INPUTSIZE = 1000, 2000, 4000, 8000 > DOP = 1, 2, 4, 8 > OUTPUT=benchmarkResults > repetitions=10 > command=benchmark.jar -p $DOP $INPUTSIZE > {code} > Something like that would execute the benchmark job with (DOP=1, > INPUTSIZE=1000), (DOP=2, INPUTSIZE=2000), 10 times each, calculate for > each parameter combination runtime statistics and store the results in the > file benchmarkResults. > I believe that spending some effort now will pay off in the long run because > we will benchmark Flink continuously. What do you guys think? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1929) Add code to cleanly stop a running streaming topology
[ https://issues.apache.org/jira/browse/FLINK-1929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-1929. Resolution: Fixed > Add code to cleanly stop a running streaming topology > - > > Key: FLINK-1929 > URL: https://issues.apache.org/jira/browse/FLINK-1929 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Robert Metzger > > Right now its not possible to cleanly stop a running Streaming topology. > Cancelling the job will cancel all operators, but for proper exactly once > processing from Kafka sources, we need to provide a way to stop the sources > first, wait until all remaining tuples have been processed and then shut down > the sources (so that they can commit the right offset to Zookeeper). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134665239 Thanks! Good to merge, IMO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1639) Document the Flink deployment scripts to make sure others know how to make release
[ https://issues.apache.org/jira/browse/FLINK-1639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-1639. Resolution: Fixed Assignee: (was: Márton Balassi) Documentation has been added in https://cwiki.apache.org/confluence/display/FLINK/Releasing Max also added more comments to the script itself. > Document the Flink deployment scripts to make sure others know how to make > release > -- > > Key: FLINK-1639 > URL: https://issues.apache.org/jira/browse/FLINK-1639 > Project: Flink > Issue Type: Task > Components: release >Reporter: Henry Saputra > > Currently, Robert knows the detail about Flink deployment and release scripts > to support both Hadoop versions. > Need to document details black magic used in the scripts to make sure other > knows how the flow work just in case we need to push release and Robert is > not available. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134663435 @fhueske Added the test you requested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1558) Spurious failure in PipelinedPartitionQueueTest
[ https://issues.apache.org/jira/browse/FLINK-1558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-1558. Resolution: Fixed Fix Version/s: 0.9 Fixed via 34c233f > Spurious failure in PipelinedPartitionQueueTest > --- > > Key: FLINK-1558 > URL: https://issues.apache.org/jira/browse/FLINK-1558 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 0.9 >Reporter: Stephan Ewen > Fix For: 0.9 > > > The failure is reported as > {code} > java.lang.AssertionError: Unexpected failure during test: null. Producer > error: null, consumer error: null > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQueueTest.doTestConcurrentProduceConsume(PipelinedPartitionQueueTest.java:214) > at > org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQueueTest.doTestConcurrentProduceConsume(PipelinedPartitionQueueTest.java:171) > at > org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQueueTest.testConcurrentProduceConsume(PipelinedPartitionQueueTest.java:142) > {code} > The test shows the following stack trace > {code} > java.util.concurrent.TimeoutException > at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258) > at java.util.concurrent.FutureTask.get(FutureTask.java:119) > at > org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQueueTest.doTestConcurrentProduceConsume(PipelinedPartitionQueueTest.java:198) > at > org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQueueTest.doTestConcurrentProduceConsume(PipelinedPartitionQueueTest.java:171) > at > org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQueueTest.testConcurrentProduceConsume(PipelinedPartitionQueueTest.java:142) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:622) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2565) Support primitive arrays as keys
[ https://issues.apache.org/jira/browse/FLINK-2565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711594#comment-14711594 ] ASF GitHub Bot commented on FLINK-2565: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134665239 Thanks! Good to merge, IMO. > Support primitive arrays as keys > > > Key: FLINK-2565 > URL: https://issues.apache.org/jira/browse/FLINK-2565 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1426) JobManager AJAX requests sometimes fail
[ https://issues.apache.org/jira/browse/FLINK-1426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-1426. Resolution: Invalid This is superseded by the new web front end. I didn't see any recent progress in the repo you've linked. If this is still ongoing, we will have to sync this with the ongoing progress to refactor the web interface. > JobManager AJAX requests sometimes fail > --- > > Key: FLINK-1426 > URL: https://issues.apache.org/jira/browse/FLINK-1426 > Project: Flink > Issue Type: Bug > Components: JobManager, Webfrontend >Reporter: Robert Metzger > > It seems that the JobManager sometimes (I think when accessing it the first > time) does not show the number of TMs / slots. > A simple workaround is re-loading it, but still, users are complaining about > it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2565) Support primitive arrays as keys
[ https://issues.apache.org/jira/browse/FLINK-2565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711586#comment-14711586 ] ASF GitHub Bot commented on FLINK-2565: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134663435 @fhueske Added the test you requested. > Support primitive arrays as keys > > > Key: FLINK-2565 > URL: https://issues.apache.org/jira/browse/FLINK-2565 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1195) Improvement of benchmarking infrastructure
[ https://issues.apache.org/jira/browse/FLINK-1195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711583#comment-14711583 ] Ufuk Celebi commented on FLINK-1195: Can this be resolved as it does not relate to the core Flink project? > Improvement of benchmarking infrastructure > -- > > Key: FLINK-1195 > URL: https://issues.apache.org/jira/browse/FLINK-1195 > Project: Flink > Issue Type: Wish >Reporter: Till Rohrmann >Assignee: Alexander Alexandrov > > I noticed while running my ALS benchmarks that we still have some potential > to improve our benchmarking infrastructure. The current state is that we > execute the benchmark jobs by writing a script with a single set of > parameters. The runtime is then manually retrieved from the web interface of > Flink and Spark, respectively. > I think we need the following extensions: > * Automatic runtime retrieval and storage in a file > * Repeated execution of jobs to gather some "advanced" statistics such as > mean and standard deviation of the runtimes > * Support for value sets for the individual parameters > The automatic runtime retrieval would allow us to execute several benchmarks > consecutively without having to lookup the runtimes in the logs or in the web > interface, which btw only stores the runtimes of the last 5 jobs. > What I mean with value sets is that would be nice to specify a set of > parameter values for which the benchmark is run without having to write for > every single parameter combination a benchmark script. I believe that this > feature would become very handy when we want to look at the runtime behaviour > of Flink for different input sizes or degrees of parallelism, for example. To > illustrate what I mean: > {code} > INPUTSIZE = 1000, 2000, 4000, 8000 > DOP = 1, 2, 4, 8 > OUTPUT=benchmarkResults > repetitions=10 > command=benchmark.jar -p $DOP $INPUTSIZE > {code} > Something like that would execute the benchmark job with (DOP=1, > INPUTSIZE=1000), (DOP=2, INPUTSIZE=2000), 10 times each, calculate for > each parameter combination runtime statistics and store the results in the > file benchmarkResults. > I believe that spending some effort now will pay off in the long run because > we will benchmark Flink continuously. What do you guys think? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1158) Logging property files missing in project created by archetypes
[ https://issues.apache.org/jira/browse/FLINK-1158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-1158. Resolution: Fixed Fix Version/s: 0.9 The current archetypes have a logging property file. > Logging property files missing in project created by archetypes > --- > > Key: FLINK-1158 > URL: https://issues.apache.org/jira/browse/FLINK-1158 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 0.7.0-incubating >Reporter: Till Rohrmann > Fix For: 0.9 > > > If one creates a flink project using the archetypes, then there are no > predefined logging properties files. Would be very convenient for the user to > have them generated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2569) CsvReader support for ValueTypes
[ https://issues.apache.org/jira/browse/FLINK-2569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711570#comment-14711570 ] ASF GitHub Bot commented on FLINK-2569: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1053#discussion_r37886531 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -123,6 +132,21 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; } + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + data.writeAsText(resultPath); --- End diff -- That is true, but it might ease the problem a little bit if newly added tests try to use `collect`. And I doubt that we'll soon find somebody who will take care of this. > CsvReader support for ValueTypes > > > Key: FLINK-2569 > URL: https://issues.apache.org/jira/browse/FLINK-2569 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Greg Hogan >Assignee: Chiwan Park >Priority: Minor > > From the Flink Programming Guide section on Data Sources: > {quote} > readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) > delimited fields. Returns a DataSet of tuples or POJOs. Supports the basic > java types and their Value counterparts as field types. > {quote} > When specifying a ValueType, i.e. > {code} > CsvReader csvReader = env.readCsvFile(filename); > csvReader.types(IntValue.class, IntValue.class); > {code} > the following error occurs as BasicTypeInfo is specifically requested in > CsvReader.types(...). > {code} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:327) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:608) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:296) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:927) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:977) > Caused by: java.lang.IllegalArgumentException: Type at position 0 is not a > basic type. > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.getBasicTupleTypeInfo(TupleTypeInfo.java:177) > at org.apache.flink.api.java.io.CsvReader.types(CsvReader.java:393) > at Driver.main(Driver.java:105) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > ... 6 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1011) Sometimes Flow/Stack Layout is not presented in Dashboard's history
[ https://issues.apache.org/jira/browse/FLINK-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-1011. Resolution: Cannot Reproduce > Sometimes Flow/Stack Layout is not presented in Dashboard's history > --- > > Key: FLINK-1011 > URL: https://issues.apache.org/jira/browse/FLINK-1011 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: pre-apache-0.5 > Environment: Mac OS X and Ubuntu linux. OpenJDK 1.7. >Reporter: Asterios Katsifodimos >Priority: Minor > > The flow/stack layout in the history of completed jobs does not show up > (Stratosphere Dashboard). This does not happen always. Sometimes you may get > it to work. > I just reproduced this one with the WordCount java example from 0.5.1 > version. The job runs successfully. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2200) Flink API with Scala 2.11 - Maven Repository
[ https://issues.apache.org/jira/browse/FLINK-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711563#comment-14711563 ] ASF GitHub Bot commented on FLINK-2200: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/885#issuecomment-134658771 Is the Maven shade plugin bug the reason why this fails: ``` ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:2.4.1:shade (shade-hadoop) on project flink-yarn-tests_2.11: Error creating shaded jar: 3 problems were encountered while building the effective model for org.apache.flink:flink-yarn-tests_2.11:[unknown-version] [ERROR] [WARNING] 'artifactId' contains an expression but should be a constant. @ org.apache.flink:flink-yarn-tests${scala.suffix}:[unknown-version], /home/robert/incubator-flink/flink-yarn-tests/pom.xml, line 36, column 14 [ERROR] [WARNING] 'parent.relativePath' of POM org.apache.flink:flink-yarn-tests_2.11:[unknown-version] (/home/robert/incubator-flink/flink-yarn-tests/target/dependency-reduced-pom.xml) points at org.apache.flink:flink-yarn-tests${scala.suffix} instead of org.apache.flink:flink-parent${scala.suffix}, please verify your project structure @ line 3, column 11 [ERROR] [FATAL] Non-resolvable parent POM for org.apache.flink:flink-yarn-tests_2.11:[unknown-version]: Could not find artifact org.apache.flink:flink-parent${scala.suffix}:pom:0.10-SNAPSHOT in apache.snapshots (http://repository.apache.org/snapshots) and 'parent.relativePath' points at wrong local POM @ line 3, column 11 [ERROR] for project org.apache.flink:flink-yarn-tests_2.11:[unknown-version] at /home/robert/incubator-flink/flink-yarn-tests/target/dependency-reduced-pom.xml ``` ? > About the shading artifacts, your guess is right. Because Hadoop packages don't need Scala dependencies, I didn't add suffix to them. But if we need the suffix for them to maintain uniformity, we can add the suffix. How do you think? I think its fine to leave them as they are. > As you see, there are property expressions (${scala.suffix}) in artifactId. I think that it can be a problem. How can I solve this? Yes, that is certainly a problem. Also, the artifact for flink-parent is not created properly in my local maven repository. Its name is now `flink-parent${scala.suffix}/`. Maybe we have to look at other projects which are doing the same... if there are any projects ;) Kafka for example is offering builds for different scala versions. Sadly, they are using sbt for building their project. Spark doesn't deploy its _2.11 artifacts to maven central. > Flink API with Scala 2.11 - Maven Repository > > > Key: FLINK-2200 > URL: https://issues.apache.org/jira/browse/FLINK-2200 > Project: Flink > Issue Type: Wish > Components: Build System, Scala API >Reporter: Philipp Götze >Assignee: Chiwan Park >Priority: Trivial > Labels: maven > > It would be nice if you could upload a pre-built version of the Flink API > with Scala 2.11 to the maven repository. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2200] Add Flink with Scala 2.11 in Mave...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/885#issuecomment-134658771 Is the Maven shade plugin bug the reason why this fails: ``` ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:2.4.1:shade (shade-hadoop) on project flink-yarn-tests_2.11: Error creating shaded jar: 3 problems were encountered while building the effective model for org.apache.flink:flink-yarn-tests_2.11:[unknown-version] [ERROR] [WARNING] 'artifactId' contains an expression but should be a constant. @ org.apache.flink:flink-yarn-tests${scala.suffix}:[unknown-version], /home/robert/incubator-flink/flink-yarn-tests/pom.xml, line 36, column 14 [ERROR] [WARNING] 'parent.relativePath' of POM org.apache.flink:flink-yarn-tests_2.11:[unknown-version] (/home/robert/incubator-flink/flink-yarn-tests/target/dependency-reduced-pom.xml) points at org.apache.flink:flink-yarn-tests${scala.suffix} instead of org.apache.flink:flink-parent${scala.suffix}, please verify your project structure @ line 3, column 11 [ERROR] [FATAL] Non-resolvable parent POM for org.apache.flink:flink-yarn-tests_2.11:[unknown-version]: Could not find artifact org.apache.flink:flink-parent${scala.suffix}:pom:0.10-SNAPSHOT in apache.snapshots (http://repository.apache.org/snapshots) and 'parent.relativePath' points at wrong local POM @ line 3, column 11 [ERROR] for project org.apache.flink:flink-yarn-tests_2.11:[unknown-version] at /home/robert/incubator-flink/flink-yarn-tests/target/dependency-reduced-pom.xml ``` ? > About the shading artifacts, your guess is right. Because Hadoop packages don't need Scala dependencies, I didn't add suffix to them. But if we need the suffix for them to maintain uniformity, we can add the suffix. How do you think? I think its fine to leave them as they are. > As you see, there are property expressions (${scala.suffix}) in artifactId. I think that it can be a problem. How can I solve this? Yes, that is certainly a problem. Also, the artifact for flink-parent is not created properly in my local maven repository. Its name is now `flink-parent${scala.suffix}/`. Maybe we have to look at other projects which are doing the same... if there are any projects ;) Kafka for example is offering builds for different scala versions. Sadly, they are using sbt for building their project. Spark doesn't deploy its _2.11 artifacts to maven central. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-557) debian: permissions and users
[ https://issues.apache.org/jira/browse/FLINK-557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-557. --- Resolution: Won't Fix We don't provide a Debian image as download anymore. > debian: permissions and users > - > > Key: FLINK-557 > URL: https://issues.apache.org/jira/browse/FLINK-557 > Project: Flink > Issue Type: Bug >Reporter: GitHub Import > Labels: github-import > Fix For: pre-apache > > > currently it seems as if all processes are run by the root user. > For example calling the following command from a normal user account leads to > write problems for the logfiles: > ``` > /usr/share/stratosphere-dist/bin/stratosphere run -j > /usr/share/stratosphere-dist/examples/stratosphere-java-examples-0.5-SNAPSHOT-WordCount.jar > -a 16 file:///var/log/syslog file:///home/physikerwelt/out > > An example how to run services as the designated user can be found at > https://github.com/physikerwelt/mathoid/tree/master/debian > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/557 > Created by: [physikerwelt|https://github.com/physikerwelt] > Labels: > Created at: Tue Mar 11 12:41:12 CET 2014 > State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-555) debian: add /usr/share/stratosphere-dist/bin/stratosphere to path
[ https://issues.apache.org/jira/browse/FLINK-555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-555. --- Resolution: Won't Fix We don't provide a Debian image as download anymore. > debian: add /usr/share/stratosphere-dist/bin/stratosphere to path > - > > Key: FLINK-555 > URL: https://issues.apache.org/jira/browse/FLINK-555 > Project: Flink > Issue Type: Improvement >Reporter: GitHub Import >Priority: Minor > Labels: github-import > Fix For: pre-apache > > > I suggest to add /usr/share/stratosphere-dist/bin/stratosphere to the path so > that you don't need to care where stratosphere was installed > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/555 > Created by: [physikerwelt|https://github.com/physikerwelt] > Labels: > Created at: Tue Mar 11 12:26:03 CET 2014 > State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (FLINK-82) (Hash) Aggregators (fold style)
[ https://issues.apache.org/jira/browse/FLINK-82?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi reopened FLINK-82: -- Accidentally closed > (Hash) Aggregators (fold style) > --- > > Key: FLINK-82 > URL: https://issues.apache.org/jira/browse/FLINK-82 > Project: Flink > Issue Type: New Feature > Components: Local Runtime >Reporter: GitHub Import > Labels: github-import > Fix For: pre-apache > > > This issue depends on issue https://github.com/dimalabs/ozone/issues/81 > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/82 > Created by: [dimalabs|https://github.com/dimalabs] > Labels: > Created at: Fri Sep 06 16:15:09 CEST 2013 > State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-82) (Hash) Aggregators (fold style)
[ https://issues.apache.org/jira/browse/FLINK-82?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-82. -- Resolution: Fixed > (Hash) Aggregators (fold style) > --- > > Key: FLINK-82 > URL: https://issues.apache.org/jira/browse/FLINK-82 > Project: Flink > Issue Type: New Feature > Components: Local Runtime >Reporter: GitHub Import > Labels: github-import > Fix For: pre-apache > > > This issue depends on issue https://github.com/dimalabs/ozone/issues/81 > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/82 > Created by: [dimalabs|https://github.com/dimalabs] > Labels: > Created at: Fri Sep 06 16:15:09 CEST 2013 > State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2569) CsvReader support for ValueTypes
[ https://issues.apache.org/jira/browse/FLINK-2569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711534#comment-14711534 ] ASF GitHub Bot commented on FLINK-2569: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1053#discussion_r37884384 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -123,6 +132,21 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; } + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + data.writeAsText(resultPath); --- End diff -- @tillrohrmann Yeah, I know that. But to use `collect` instead of writing to disk, we need to change all test methods in `CsvReaderITCase`. Maybe we can cover this in other issue (FLINK-2032). > CsvReader support for ValueTypes > > > Key: FLINK-2569 > URL: https://issues.apache.org/jira/browse/FLINK-2569 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Greg Hogan >Assignee: Chiwan Park >Priority: Minor > > From the Flink Programming Guide section on Data Sources: > {quote} > readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) > delimited fields. Returns a DataSet of tuples or POJOs. Supports the basic > java types and their Value counterparts as field types. > {quote} > When specifying a ValueType, i.e. > {code} > CsvReader csvReader = env.readCsvFile(filename); > csvReader.types(IntValue.class, IntValue.class); > {code} > the following error occurs as BasicTypeInfo is specifically requested in > CsvReader.types(...). > {code} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:327) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:608) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:296) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:927) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:977) > Caused by: java.lang.IllegalArgumentException: Type at position 0 is not a > basic type. > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.getBasicTupleTypeInfo(TupleTypeInfo.java:177) > at org.apache.flink.api.java.io.CsvReader.types(CsvReader.java:393) > at Driver.main(Driver.java:105) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > ... 6 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2569] [core] Add CsvReader support for ...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1053#discussion_r37884384 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -123,6 +132,21 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; } + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + data.writeAsText(resultPath); --- End diff -- @tillrohrmann Yeah, I know that. But to use `collect` instead of writing to disk, we need to change all test methods in `CsvReaderITCase`. Maybe we can cover this in other issue (FLINK-2032). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2291] [runtime] Adds high availability ...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1016#issuecomment-134647030 +1 to merge, we should follow up on the Mini cluster and Curator shading separately --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Stale Synchronous Parallel Iterations
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/967#discussion_r37883572 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java --- @@ -123,6 +169,31 @@ private IterationEventWithAggregators pipeThroughSerialization(IterationEventWit return null; } } + + private ClockTaskEvent pipeThroughSerialization2(ClockTaskEvent event) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + event.write(new OutputViewDataOutputStreamWrapper(out)); + out.flush(); + + byte[] data = baos.toByteArray(); + out.close(); + baos.close(); + + DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); + ClockTaskEvent newEvent = event.getClass().newInstance(); + newEvent.read(new InputViewDataInputStreamWrapper(in)); + in.close(); + + return newEvent; + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Test threw an exception: " + e.getMessage()); + return null; --- End diff -- `null` will never be returned. Why do you catch the exception here don't let it bubble up? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711509#comment-14711509 ] ASF GitHub Bot commented on FLINK-2314: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37883013 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java --- @@ -120,12 +131,24 @@ public void run(SourceContext ctx) throws Exception { OUT nextElement = serializer.createInstance(); nextElement = format.nextRecord(nextElement); if (nextElement == null && splitIterator.hasNext()) { - format.open(splitIterator.next()); + InputSplit split = splitIterator.next(); + splitNumber = split.getSplitNumber(); + currRecord = 0l; + format.open(split); continue; } else if (nextElement == null) { break; } - ctx.collect(nextElement); + if(splitNumber == checkpointedSplit){ --- End diff -- What if you've checkpointed the 2. split after seeing the 1. and 2. split and now the source is re-executed with the first split? Aren't records written again because you only save the latest checkpointed split number? > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Sheetal Parade > Labels: easyfix, starter > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2291) Use ZooKeeper to elect JobManager leader and send information to TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-2291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711513#comment-14711513 ] ASF GitHub Bot commented on FLINK-2291: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1016#issuecomment-134647030 +1 to merge, we should follow up on the Mini cluster and Curator shading separately > Use ZooKeeper to elect JobManager leader and send information to TaskManagers > - > > Key: FLINK-2291 > URL: https://issues.apache.org/jira/browse/FLINK-2291 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.10 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 0.10 > > > Use ZooKeeper to determine the leader of a set of {{JobManagers}} which will > act as the responsible {{JobManager}} for all {{TaskManager}}. The > {{TaskManager}} will get the address of the leader from ZooKeeper. > Related Wiki: > [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2291) Use ZooKeeper to elect JobManager leader and send information to TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-2291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711510#comment-14711510 ] ASF GitHub Bot commented on FLINK-2291: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1016#issuecomment-134646755 Looks very good. Minor comments that we may address after this pull request: - The Flink Mini cluster becomes tricky, the configurations ever more intransparent. This could use a rework. - You shade curator in Hadoop, but not in Flink. Do we expect collisions with other systems that use Curator, like newer versions of the Kafka consumers? (IIRC 0.8.3 starts using Curator). > Use ZooKeeper to elect JobManager leader and send information to TaskManagers > - > > Key: FLINK-2291 > URL: https://issues.apache.org/jira/browse/FLINK-2291 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.10 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 0.10 > > > Use ZooKeeper to determine the leader of a set of {{JobManagers}} which will > act as the responsible {{JobManager}} for all {{TaskManager}}. The > {{TaskManager}} will get the address of the leader from ZooKeeper. > Related Wiki: > [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2291] [runtime] Adds high availability ...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1016#issuecomment-134646755 Looks very good. Minor comments that we may address after this pull request: - The Flink Mini cluster becomes tricky, the configurations ever more intransparent. This could use a rework. - You shade curator in Hadoop, but not in Flink. Do we expect collisions with other systems that use Curator, like newer versions of the Kafka consumers? (IIRC 0.8.3 starts using Curator). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711481#comment-14711481 ] ASF GitHub Bot commented on FLINK-2314: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37881554 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FileSourceFunctionTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FileSourceFunctionTest { + @Test + public void testFileSourceFunction() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { + fileSourceFunction.open(new Configuration()); + fileSourceFunction.run(ctx); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(ctx.getData().size() == 200); + } + + @Test + public void testFileSourceFunctionCheckpoint() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { +
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711479#comment-14711479 ] ASF GitHub Bot commented on FLINK-2314: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37881478 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FileSourceFunctionTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FileSourceFunctionTest { + @Test + public void testFileSourceFunction() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { + fileSourceFunction.open(new Configuration()); + fileSourceFunction.run(ctx); + } catch (Exception e) { + e.printStackTrace(); --- End diff -- Why do print the stack trace instead of simply letting the exception bubbling up? Is this an expected test exception? > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Sheetal Parade > Labels: easyfix, starter > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2200] Add Flink with Scala 2.11 in Mave...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/885#issuecomment-134624663 Cool, that was quick ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2200] Add Flink with Scala 2.11 in Mave...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/885#issuecomment-134623006 @rmetzger Thanks! I addressed your comment and rebased on master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2314] - Added Checkpointing to File Sou...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37881537 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FileSourceFunctionTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FileSourceFunctionTest { + @Test + public void testFileSourceFunction() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { + fileSourceFunction.open(new Configuration()); + fileSourceFunction.run(ctx); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(ctx.getData().size() == 200); + } + + @Test + public void testFileSourceFunctionCheckpoint() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { + fileSourceFunction.open(new Configuration()); + fileSourceFunction.restoreState("100:1"); + fileSourceFunction.run(ctx); + } catch (Exception e) { + e.printStackTrace();
[jira] [Resolved] (FLINK-2478) The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link
[ https://issues.apache.org/jira/browse/FLINK-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-2478. Resolution: Fixed Fixed via e68c86f. > The page “FlinkML - Machine Learning for Flink“ > https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a > dead link > - > > Key: FLINK-2478 > URL: https://issues.apache.org/jira/browse/FLINK-2478 > Project: Flink > Issue Type: Task > Components: Documentation >Affects Versions: 0.10 >Reporter: Slim Baltagi >Assignee: Till Rohrmann >Priority: Minor > > Note that FlinkML is currently not part of the binary distribution. See > linking with it for cluster execution here. > 'here' links to a dead link: > https://ci.apache.org/projects/flink/flink-docs-master/libs/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution > The correct link is: > https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2565) Support primitive arrays as keys
[ https://issues.apache.org/jira/browse/FLINK-2565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711486#comment-14711486 ] ASF GitHub Bot commented on FLINK-2565: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134639235 Looks good +1 to merge > Support primitive arrays as keys > > > Key: FLINK-2565 > URL: https://issues.apache.org/jira/browse/FLINK-2565 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2394] [fix] HadoopOutputFormats use cor...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/1056 [FLINK-2394] [fix] HadoopOutputFormats use correct OutputCommitters. Right now, Flink's wrappers for Hadoop OutputFormats always use a `FileOutputCommitter`. - In the `mapreduce` API, Hadoop OutputFormats have a method `getOutputCommitter()` which can be overwritten and returns the `FileOutputFormat` by default. - In the `mapred`API, the `OutputCommitter` should be obtained from the `JobConf`. If nothing custom is set, a `FileOutputCommitter` is returned. This PR uses the respective methods to obtain the correct `OutputCommitter`. Since, `FileOutputCommitter` is the default in both cases, the original semantics are preserved if no custom committer is implemented or set by the user. I also added convenience methods to the constructors of the `mapred` wrappers to set the `OutputCommitter` in the `JobConf`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink hadoopOutCommitter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1056.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1056 commit a632203a948f2e7973339a0eab88750f7ce70cc5 Author: Fabian Hueske Date: 2015-07-30T19:47:01Z [FLINK-2394] [fix] HadoopOutputFormats use correct OutputCommitters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2569] [core] Add CsvReader support for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1053#discussion_r37880533 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -123,6 +132,21 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; } + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + data.writeAsText(resultPath); --- End diff -- If I'm not mistaken, then we wanted to avoid writing data to disk because this sometimes fails on Travis. Instead we should use `collect` to keep the data in memory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2314] - Added Checkpointing to File Sou...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37881389 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FileSourceFunctionTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FileSourceFunctionTest { --- End diff -- Can you let this class extend `TestLogger`? This will improve test case logging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711473#comment-14711473 ] ASF GitHub Bot commented on FLINK-2314: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37881389 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FileSourceFunctionTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FileSourceFunctionTest { --- End diff -- Can you let this class extend `TestLogger`? This will improve test case logging. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Sheetal Parade > Labels: easyfix, starter > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2569] [core] Add CsvReader support for ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1053#issuecomment-134621653 Looks good to merge :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2314] - Added Checkpointing to File Sou...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37881554 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FileSourceFunctionTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FileSourceFunctionTest { + @Test + public void testFileSourceFunction() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { + fileSourceFunction.open(new Configuration()); + fileSourceFunction.run(ctx); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(ctx.getData().size() == 200); + } + + @Test + public void testFileSourceFunctionCheckpoint() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { + fileSourceFunction.open(new Configuration()); + fileSourceFunction.restoreState("100:1"); + fileSourceFunction.run(ctx); + } catch (Exception e) { + e.printStackTrace();
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711480#comment-14711480 ] ASF GitHub Bot commented on FLINK-2314: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37881537 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FileSourceFunctionTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FileSourceFunctionTest { + @Test + public void testFileSourceFunction() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { + fileSourceFunction.open(new Configuration()); + fileSourceFunction.run(ctx); + } catch (Exception e) { + e.printStackTrace(); + } + Assert.assertTrue(ctx.getData().size() == 200); + } + + @Test + public void testFileSourceFunctionCheckpoint() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { +
[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api
[ https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711469#comment-14711469 ] ASF GitHub Bot commented on FLINK-2480: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/991#issuecomment-134635720 @HuangWHWHW Can you access the CI reports now? Has the Travis team fixed the problem? > Improving tests coverage for org.apache.flink.streaming.api > --- > > Key: FLINK-2480 > URL: https://issues.apache.org/jira/browse/FLINK-2480 > Project: Flink > Issue Type: Test > Components: Streaming >Affects Versions: 0.10 >Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 504h > Remaining Estimate: 504h > > The streaming API is quite a bit newer than the other code so it is not that > well covered with tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/991#issuecomment-134635720 @HuangWHWHW Can you access the CI reports now? Has the Travis team fixed the problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2314] - Added Checkpointing to File Sou...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37881478 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FileSourceFunctionTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FileSourceFunctionTest { + @Test + public void testFileSourceFunction() { + DummyFileInputFormat inputFormat = new DummyFileInputFormat(); + RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, + inputFormat.getDummyInputSplitProvider(), 1024), null, new ExecutionConfig(), new DummyModKey(2), + new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + + inputFormat.setFilePath("file:///some/none/existing/directory/"); + FileSourceFunction fileSourceFunction = new FileSourceFunction(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + + fileSourceFunction.setRuntimeContext(runtimeContext); + DummyContext ctx = new DummyContext(); + try { + fileSourceFunction.open(new Configuration()); + fileSourceFunction.run(ctx); + } catch (Exception e) { + e.printStackTrace(); --- End diff -- Why do print the stack trace instead of simply letting the exception bubbling up? Is this an expected test exception? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711474#comment-14711474 ] ASF GitHub Bot commented on FLINK-2394: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/1056 [FLINK-2394] [fix] HadoopOutputFormats use correct OutputCommitters. Right now, Flink's wrappers for Hadoop OutputFormats always use a `FileOutputCommitter`. - In the `mapreduce` API, Hadoop OutputFormats have a method `getOutputCommitter()` which can be overwritten and returns the `FileOutputFormat` by default. - In the `mapred`API, the `OutputCommitter` should be obtained from the `JobConf`. If nothing custom is set, a `FileOutputCommitter` is returned. This PR uses the respective methods to obtain the correct `OutputCommitter`. Since, `FileOutputCommitter` is the default in both cases, the original semantics are preserved if no custom committer is implemented or set by the user. I also added convenience methods to the constructors of the `mapred` wrappers to set the `OutputCommitter` in the `JobConf`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink hadoopOutCommitter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1056.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1056 commit a632203a948f2e7973339a0eab88750f7ce70cc5 Author: Fabian Hueske Date: 2015-07-30T19:47:01Z [FLINK-2394] [fix] HadoopOutputFormats use correct OutputCommitters. > HadoopOutFormat OutputCommitter is default to FileOutputCommiter > > > Key: FLINK-2394 > URL: https://issues.apache.org/jira/browse/FLINK-2394 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility >Affects Versions: 0.9.0 >Reporter: Stefano Bortoli >Assignee: Fabian Hueske > Fix For: 0.10, 0.9.1 > > > MongoOutputFormat does not write back in collection because the > HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and > is set as default to FileOutputCommitter. Therefore, on close and > globalFinalize execution the commit does not happen and mongo collection > stays untouched. > A simple solution would be to: > 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat > that gets the OutputCommitter as a parameter > 2 - change the outputCommitter field of HadoopOutputFormatBase to be a > generic OutputCommitter > 3 - remove the default assignment in the open() and finalizeGlobal to the > outputCommitter to FileOutputCommitter(), or keep it as a default in case of > no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2555] Properly pass security credential...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1038#issuecomment-134632899 I've opened another issue for that: https://issues.apache.org/jira/browse/FLINK-2573 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2089] [runtime] Fix illegal state in Re...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1050#issuecomment-134634885 LGTM +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2089) "Buffer recycled" IllegalStateException during cancelling
[ https://issues.apache.org/jira/browse/FLINK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711466#comment-14711466 ] ASF GitHub Bot commented on FLINK-2089: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1050#issuecomment-134634885 LGTM +1 > "Buffer recycled" IllegalStateException during cancelling > - > > Key: FLINK-2089 > URL: https://issues.apache.org/jira/browse/FLINK-2089 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: master >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 0.10, 0.9.1 > > > [~rmetzger] reported the following stack trace during cancelling of high > parallelism jobs: > {code} > Error: java.lang.IllegalStateException: Buffer has already been recycled. > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.flink.runtime.io.network.buffer.Buffer.ensureNotRecycled(Buffer.java:142) > at > org.apache.flink.runtime.io.network.buffer.Buffer.getMemorySegment(Buffer.java:78) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.setNextBuffer(SpillingAdaptiveSpanningRecordDeserializer.java:72) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:80) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > {code} > This looks like a concurrent buffer pool release/buffer usage error. I'm > investing this today. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2569) CsvReader support for ValueTypes
[ https://issues.apache.org/jira/browse/FLINK-2569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711456#comment-14711456 ] ASF GitHub Bot commented on FLINK-2569: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1053#discussion_r37880533 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java --- @@ -123,6 +132,21 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; } + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + data.writeAsText(resultPath); --- End diff -- If I'm not mistaken, then we wanted to avoid writing data to disk because this sometimes fails on Travis. Instead we should use `collect` to keep the data in memory. > CsvReader support for ValueTypes > > > Key: FLINK-2569 > URL: https://issues.apache.org/jira/browse/FLINK-2569 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Greg Hogan >Assignee: Chiwan Park >Priority: Minor > > From the Flink Programming Guide section on Data Sources: > {quote} > readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) > delimited fields. Returns a DataSet of tuples or POJOs. Supports the basic > java types and their Value counterparts as field types. > {quote} > When specifying a ValueType, i.e. > {code} > CsvReader csvReader = env.readCsvFile(filename); > csvReader.types(IntValue.class, IntValue.class); > {code} > the following error occurs as BasicTypeInfo is specifically requested in > CsvReader.types(...). > {code} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:327) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:608) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:296) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:927) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:977) > Caused by: java.lang.IllegalArgumentException: Type at position 0 is not a > basic type. > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.getBasicTupleTypeInfo(TupleTypeInfo.java:177) > at org.apache.flink.api.java.io.CsvReader.types(CsvReader.java:393) > at Driver.main(Driver.java:105) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > ... 6 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)