[GitHub] flink issue #2645: [FLINK-4838] remove STREAM keyword in StreamSQLExample
Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/2645 Any more comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4838) remove STREAM keyword in StreamSQLExample
[ https://issues.apache.org/jira/browse/FLINK-4838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15596638#comment-15596638 ] ASF GitHub Bot commented on FLINK-4838: --- Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/2645 Any more comments? > remove STREAM keyword in StreamSQLExample > - > > Key: FLINK-4838 > URL: https://issues.apache.org/jira/browse/FLINK-4838 > Project: Flink > Issue Type: Bug > Components: Examples, Table API & SQL >Reporter: Manu Zhang >Priority: Minor > > After FLINK-4546, "STREAM" keyword should be removed from SQL query in > StreamSQLExample -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4204) Clean up gelly-examples
[ https://issues.apache.org/jira/browse/FLINK-4204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595767#comment-15595767 ] ASF GitHub Bot commented on FLINK-4204: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2670 I pushed a commit to remove the `GraphMetrics` example. I think providing drivers for all library methods is both desirable and ambitious. If we like the form and functionality of the current drivers then I'd like to look at consolidating common functionality where possible. We may also be able to put multiple similar algorithms like `JaccardIndex` / `AdamicAdar` / `CommonNeighbors` into the same driver. I had first removed `TriangleListing` as it's not an algorithm but I added it back due to Facebook's recent benchmarking: https://code.facebook.com/posts/319004238457019/a-comparison-of-state-of-the-art-graph-processing-systems > Clean up gelly-examples > --- > > Key: FLINK-4204 > URL: https://issues.apache.org/jira/browse/FLINK-4204 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > > The gelly-examples has grown quite big (14 examples) and contains several > examples that illustrate the same functionality. Examples should help users > understand how to use the API and ideally show how to use 1-2 features. > Also, it is helpful to state the purpose of each example in the comments. > We should keep the example set small and move everything that does not fit > there to the library. > I propose to remove the following: > - ClusteringCoefficient: the functionality already exists as a library method. > - HITS: the functionality already exists as a library method. > - JaccardIndex: the functionality already exists as a library method. > - SingleSourceShortestPaths: the example shows how to use scatter-gather > iterations. HITSAlgorithm shows the same feature plus the use of aggregators. > I propose we keep this one instead. > - TriangleListing: the functionality already exists as a library method -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2670: [FLINK-4204] [gelly] Clean up gelly-examples
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2670 I pushed a commit to remove the `GraphMetrics` example. I think providing drivers for all library methods is both desirable and ambitious. If we like the form and functionality of the current drivers then I'd like to look at consolidating common functionality where possible. We may also be able to put multiple similar algorithms like `JaccardIndex` / `AdamicAdar` / `CommonNeighbors` into the same driver. I had first removed `TriangleListing` as it's not an algorithm but I added it back due to Facebook's recent benchmarking: https://code.facebook.com/posts/319004238457019/a-comparison-of-state-of-the-art-graph-processing-systems --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595726#comment-15595726 ] ASF GitHub Bot commented on FLINK-3674: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2570 > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek resolved FLINK-3674. - Resolution: Fixed Fix Version/s: 1.2.0 Implemented in https://github.com/apache/flink/commit/81b19e5323edd384e00f77eaa4a5c543db7e2499 > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > Fix For: 1.2.0 > > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime
[ https://issues.apache.org/jira/browse/FLINK-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek resolved FLINK-4852. - Resolution: Fixed Fix Version/s: (was: 1.1.4) Fixed in https://github.com/apache/flink/commit/71d2e3ef1e42174822709aa8217088f2a489975a > ClassCastException when assigning Watermarks with > TimeCharacteristic.ProcessingTime > --- > > Key: FLINK-4852 > URL: https://issues.apache.org/jira/browse/FLINK-4852 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0, 1.1.3 >Reporter: Maximilian Michels >Assignee: Aljoscha Krettek > Fix For: 1.2.0 > > > As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, > when emitting Watermarks and using processing time, you get the following > ClassCastException: > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) > at > org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) > ... 11 more > Caused by: java.lang.RuntimeException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) > at >
[jira] [Resolved] (FLINK-4877) Refactorings around FLINK-3674 (User Function Timers)
[ https://issues.apache.org/jira/browse/FLINK-4877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek resolved FLINK-4877. - Resolution: Fixed Fix Version/s: 1.2.0 Implemented in https://github.com/apache/flink/commit/770f2f83a81b2810aff171b2f56390ef686f725a > Refactorings around FLINK-3674 (User Function Timers) > - > > Key: FLINK-4877 > URL: https://issues.apache.org/jira/browse/FLINK-4877 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2570 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime
[ https://issues.apache.org/jira/browse/FLINK-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595703#comment-15595703 ] ASF GitHub Bot commented on FLINK-4852: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2656 > ClassCastException when assigning Watermarks with > TimeCharacteristic.ProcessingTime > --- > > Key: FLINK-4852 > URL: https://issues.apache.org/jira/browse/FLINK-4852 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0, 1.1.3 >Reporter: Maximilian Michels >Assignee: Aljoscha Krettek > Fix For: 1.2.0, 1.1.4 > > > As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, > when emitting Watermarks and using processing time, you get the following > ClassCastException: > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) > at > org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) > ... 11 more > Caused by: java.lang.RuntimeException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) > at >
[jira] [Commented] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime
[ https://issues.apache.org/jira/browse/FLINK-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595701#comment-15595701 ] ASF GitHub Bot commented on FLINK-4852: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2656 Manually merged > ClassCastException when assigning Watermarks with > TimeCharacteristic.ProcessingTime > --- > > Key: FLINK-4852 > URL: https://issues.apache.org/jira/browse/FLINK-4852 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0, 1.1.3 >Reporter: Maximilian Michels >Assignee: Aljoscha Krettek > Fix For: 1.2.0, 1.1.4 > > > As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, > when emitting Watermarks and using processing time, you get the following > ClassCastException: > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) > at > org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) > ... 11 more > Caused by: java.lang.RuntimeException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) > at >
[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts
[ https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595702#comment-15595702 ] ASF GitHub Bot commented on FLINK-3030: --- Github user iampeter commented on the issue: https://github.com/apache/flink/pull/2448 @rmetzger will you be merging? if so, `flink-runtime-web/web-dashboard/web/js/index.js` just needs a `gulp` > Enhance Dashboard to show Execution Attempts > > > Key: FLINK-3030 > URL: https://issues.apache.org/jira/browse/FLINK-3030 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > Fix For: 1.0.0 > > > Currently, the web dashboard shows only the latest execution attempt. We > should make all execution attempts and their accumulators available for > inspection. > The REST monitoring API supports this, so it should be a change only to the > frontend part. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...
Github user iampeter commented on the issue: https://github.com/apache/flink/pull/2448 @rmetzger will you be merging? if so, `flink-runtime-web/web-dashboard/web/js/index.js` just needs a `gulp` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2656: [FLINK-4852] Remove Non-Multiplexing StreamRecordS...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2656 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2656: [FLINK-4852] Remove Non-Multiplexing StreamRecordSerializ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2656 Manually merged --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts
[ https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595682#comment-15595682 ] ASF GitHub Bot commented on FLINK-3030: --- Github user iampeter commented on the issue: https://github.com/apache/flink/pull/2448 @mushketyk could you try `npm view gulp-stylus` ? > Enhance Dashboard to show Execution Attempts > > > Key: FLINK-3030 > URL: https://issues.apache.org/jira/browse/FLINK-3030 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > Fix For: 1.0.0 > > > Currently, the web dashboard shows only the latest execution attempt. We > should make all execution attempts and their accumulators available for > inspection. > The REST monitoring API supports this, so it should be a change only to the > frontend part. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...
Github user iampeter commented on the issue: https://github.com/apache/flink/pull/2448 @mushketyk could you try `npm view gulp-stylus` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4204) Clean up gelly-examples
[ https://issues.apache.org/jira/browse/FLINK-4204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595656#comment-15595656 ] ASF GitHub Bot commented on FLINK-4204: --- Github user vasia commented on the issue: https://github.com/apache/flink/pull/2670 Hi @greghogan, I really like the cleanup and new organization! Two thoughts: - is the plan to add drivers for all library methods? - shall we remove the `GraphMetrics` example since there is a better driver? > Clean up gelly-examples > --- > > Key: FLINK-4204 > URL: https://issues.apache.org/jira/browse/FLINK-4204 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > > The gelly-examples has grown quite big (14 examples) and contains several > examples that illustrate the same functionality. Examples should help users > understand how to use the API and ideally show how to use 1-2 features. > Also, it is helpful to state the purpose of each example in the comments. > We should keep the example set small and move everything that does not fit > there to the library. > I propose to remove the following: > - ClusteringCoefficient: the functionality already exists as a library method. > - HITS: the functionality already exists as a library method. > - JaccardIndex: the functionality already exists as a library method. > - SingleSourceShortestPaths: the example shows how to use scatter-gather > iterations. HITSAlgorithm shows the same feature plus the use of aggregators. > I propose we keep this one instead. > - TriangleListing: the functionality already exists as a library method -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2670: [FLINK-4204] [gelly] Clean up gelly-examples
Github user vasia commented on the issue: https://github.com/apache/flink/pull/2670 Hi @greghogan, I really like the cleanup and new organization! Two thoughts: - is the plan to add drivers for all library methods? - shall we remove the `GraphMetrics` example since there is a better driver? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4604) Add support for standard deviation/variance
[ https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595581#comment-15595581 ] Fabian Hueske commented on FLINK-4604: -- Hi [~anmu], I think we can implement and add support for {{SqlKind.SUM0}} as part of this issue. For that you have to implement a custom {{Sum0Aggregate}} which extends {{SumAggregate}} and overrides the {{prepare()}} method such that it does not initialize the aggregate with {{null}} but with {{0}} if the value is {{null}}. Next you have to fix the {{AggregateUtil}} and separate {{SqlSumEmptyIsZeroAggFunction}} from {{SqlSumAggFunction}} and initialize the new {{Sum0Aggregate}} and also allow for {{case SqlKind.SUM0 => true}} in {{DataSetAggregateRule}}. If I did not forget a place to add SUM0 support, that should do the trick. > Add support for standard deviation/variance > --- > > Key: FLINK-4604 > URL: https://issues.apache.org/jira/browse/FLINK-4604 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > Attachments: 1.jpg > > > Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, > STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test > and document this rule. > If we also want to add this aggregates to Table API is up for discussion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84509867 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -96,11 +101,20 @@ public void setCommonRpcBindAddress(String bindAddress) { this.commonBindAddress = bindAddress; } + public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) { + checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager."); + this.managedMemoryPerTaskManager = managedMemoryPerTaskManager; + } + // // getters // public Configuration getConfiguration() { + // update the memory in case that we've changed the number of components (TM, RM, JM) + long memory = calculateManagedMemoryPerTaskManager(); --- End diff -- After this method has been called, you can't change the memory configuration anymore because the config value will prevent new calculation in `calculateManagedMemoryPerTaskManager`. Is that desired? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595486#comment-15595486 ] ASF GitHub Bot commented on FLINK-4851: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84507291 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Testing fatal error handler which records the occurred exceptions during the execution of the + * tests. Captured exceptions are thrown as a {@link TestingException}. + */ +public class TestingFatalErrorHandler implements FatalErrorHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); + private final AtomicReference atomicThrowable; + + public TestingFatalErrorHandler() { + atomicThrowable = new AtomicReference<>(null); + } + + public void rethrowError() throws TestingException { + Throwable throwable = atomicThrowable.get(); + + if (throwable != null) { + throw new TestingException(throwable); + } + } + + public boolean hasExceptionOccurred() { + return atomicThrowable.get() != null; + } + + public Throwable getException() { + return atomicThrowable.get(); + } + + @Override + public void onFatalError(Throwable exception) { + LOG.error("OnFatalError:", exception); + + if (!atomicThrowable.compareAndSet(null, exception)) { + atomicThrowable.get().addSuppressed(exception); + } --- End diff -- Oh, I didn't know you could do that. That's neat. > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4871) Add memory calculation for TaskManagers and forward MetricRegistry
[ https://issues.apache.org/jira/browse/FLINK-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595559#comment-15595559 ] ASF GitHub Bot commented on FLINK-4871: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84508885 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -162,4 +180,63 @@ public String toString() { ", config=" + config + '}'; } + + /** +* Calculate the managed memory per task manager. The memory is calculated in the following +* order: +* +* 1. Return {@link #managedMemoryPerTaskManager} if set +* 2. Return config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set +* 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and +* calculate the managed memory from the share of memory for a single task manager. +* +* @return +*/ + private long calculateManagedMemoryPerTaskManager() { --- End diff -- `getOrCalculateManagedMemoryPerTaskManager`? > Add memory calculation for TaskManagers and forward MetricRegistry > -- > > Key: FLINK-4871 > URL: https://issues.apache.org/jira/browse/FLINK-4871 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Add automatic memory calculation for {{TaskManagers}} executed by the > {{MiniCluster}}. > Additionally, change the {{TaskManagerRunner}} to accept a given > {{MetricRegistry}} so that the one instantiated by the {{MiniCluster}} is > used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4871) Add memory calculation for TaskManagers and forward MetricRegistry
[ https://issues.apache.org/jira/browse/FLINK-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595561#comment-15595561 ] ASF GitHub Bot commented on FLINK-4871: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84509754 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -96,11 +101,20 @@ public void setCommonRpcBindAddress(String bindAddress) { this.commonBindAddress = bindAddress; } + public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) { + checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager."); + this.managedMemoryPerTaskManager = managedMemoryPerTaskManager; + } + // // getters // public Configuration getConfiguration() { + // update the memory in case that we've changed the number of components (TM, RM, JM) + long memory = calculateManagedMemoryPerTaskManager(); --- End diff -- Getters should usually not perform any calculation. How about changing the method name to `updateConfiguration()`? > Add memory calculation for TaskManagers and forward MetricRegistry > -- > > Key: FLINK-4871 > URL: https://issues.apache.org/jira/browse/FLINK-4871 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Add automatic memory calculation for {{TaskManagers}} executed by the > {{MiniCluster}}. > Additionally, change the {{TaskManagerRunner}} to accept a given > {{MetricRegistry}} so that the one instantiated by the {{MiniCluster}} is > used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4871) Add memory calculation for TaskManagers and forward MetricRegistry
[ https://issues.apache.org/jira/browse/FLINK-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595560#comment-15595560 ] ASF GitHub Bot commented on FLINK-4871: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84509867 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -96,11 +101,20 @@ public void setCommonRpcBindAddress(String bindAddress) { this.commonBindAddress = bindAddress; } + public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) { + checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager."); + this.managedMemoryPerTaskManager = managedMemoryPerTaskManager; + } + // // getters // public Configuration getConfiguration() { + // update the memory in case that we've changed the number of components (TM, RM, JM) + long memory = calculateManagedMemoryPerTaskManager(); --- End diff -- After this method has been called, you can't change the memory configuration anymore because the config value will prevent new calculation in `calculateManagedMemoryPerTaskManager`. Is that desired? > Add memory calculation for TaskManagers and forward MetricRegistry > -- > > Key: FLINK-4871 > URL: https://issues.apache.org/jira/browse/FLINK-4871 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Add automatic memory calculation for {{TaskManagers}} executed by the > {{MiniCluster}}. > Additionally, change the {{TaskManagerRunner}} to accept a given > {{MetricRegistry}} so that the one instantiated by the {{MiniCluster}} is > used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84509754 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -96,11 +101,20 @@ public void setCommonRpcBindAddress(String bindAddress) { this.commonBindAddress = bindAddress; } + public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) { + checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager."); + this.managedMemoryPerTaskManager = managedMemoryPerTaskManager; + } + // // getters // public Configuration getConfiguration() { + // update the memory in case that we've changed the number of components (TM, RM, JM) + long memory = calculateManagedMemoryPerTaskManager(); --- End diff -- Getters should usually not perform any calculation. How about changing the method name to `updateConfiguration()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2669#discussion_r84508885 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java --- @@ -162,4 +180,63 @@ public String toString() { ", config=" + config + '}'; } + + /** +* Calculate the managed memory per task manager. The memory is calculated in the following +* order: +* +* 1. Return {@link #managedMemoryPerTaskManager} if set +* 2. Return config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set +* 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and +* calculate the managed memory from the share of memory for a single task manager. +* +* @return +*/ + private long calculateManagedMemoryPerTaskManager() { --- End diff -- `getOrCalculateManagedMemoryPerTaskManager`? ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4315) Deprecate Hadoop dependent methods in flink-java
[ https://issues.apache.org/jira/browse/FLINK-4315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595552#comment-15595552 ] Evgeny Kincharov commented on FLINK-4315: - [~fhueske] , [~greghogan] , could you look at new version of my PR. I added new commit with both versions of API. Evgeny. > Deprecate Hadoop dependent methods in flink-java > > > Key: FLINK-4315 > URL: https://issues.apache.org/jira/browse/FLINK-4315 > Project: Flink > Issue Type: Task > Components: Java API >Reporter: Stephan Ewen >Assignee: Evgeny Kincharov > Fix For: 2.0.0 > > > The API projects should be independent of Hadoop, because Hadoop is not an > integral part of the Flink stack, and we should have the option to offer > Flink without Hadoop dependencies. > The current batch APIs have a hard dependency on Hadoop, mainly because the > API has utility methods like `readHadoopFile(...)`. > I suggest to deprecate those methods and add helpers in the > `flink-hadoop-compatibility` project. > FLINK-4048 will later remove the deprecated methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2655: [FLINK-4851] [rm] Introduce FatalErrorHandler and ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84506385 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java --- @@ -145,6 +171,10 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { Future declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices); RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof RegistrationResponse.Decline); + + if (testingFatalErrorHandler.hasExceptionOccurred()) { + testingFatalErrorHandler.rethrowError(); + } --- End diff -- This seems like a lot of boilerplate that we could abstract using a base testing class for ResourceManager tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4884) Eagerly Store MergingWindowSet in State in WindowOperator
Aljoscha Krettek created FLINK-4884: --- Summary: Eagerly Store MergingWindowSet in State in WindowOperator Key: FLINK-4884 URL: https://issues.apache.org/jira/browse/FLINK-4884 Project: Flink Issue Type: Sub-task Components: Streaming Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2655: [FLINK-4851] [rm] Introduce FatalErrorHandler and ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84507291 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Testing fatal error handler which records the occurred exceptions during the execution of the + * tests. Captured exceptions are thrown as a {@link TestingException}. + */ +public class TestingFatalErrorHandler implements FatalErrorHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); + private final AtomicReference atomicThrowable; + + public TestingFatalErrorHandler() { + atomicThrowable = new AtomicReference<>(null); + } + + public void rethrowError() throws TestingException { + Throwable throwable = atomicThrowable.get(); + + if (throwable != null) { + throw new TestingException(throwable); + } + } + + public boolean hasExceptionOccurred() { + return atomicThrowable.get() != null; + } + + public Throwable getException() { + return atomicThrowable.get(); + } + + @Override + public void onFatalError(Throwable exception) { + LOG.error("OnFatalError:", exception); + + if (!atomicThrowable.compareAndSet(null, exception)) { + atomicThrowable.get().addSuppressed(exception); + } --- End diff -- Oh, I didn't know you could do that. That's neat. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595485#comment-15595485 ] ASF GitHub Bot commented on FLINK-4851: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84506385 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java --- @@ -145,6 +171,10 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { Future declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices); RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof RegistrationResponse.Decline); + + if (testingFatalErrorHandler.hasExceptionOccurred()) { + testingFatalErrorHandler.rethrowError(); + } --- End diff -- This seems like a lot of boilerplate that we could abstract using a base testing class for ResourceManager tests. > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4847) Let RpcEndpoint.start/shutDown throw exceptions
[ https://issues.apache.org/jira/browse/FLINK-4847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595459#comment-15595459 ] ASF GitHub Bot commented on FLINK-4847: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2651 LGTM > Let RpcEndpoint.start/shutDown throw exceptions > --- > > Key: FLINK-4847 > URL: https://issues.apache.org/jira/browse/FLINK-4847 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{RpcEndpoint.start}} and {{RpcEndpoint.shutDown}} methods should be > allowed to throw exceptions if things go wrong. Otherwise, exceptions will be > given to a callback which handles them later, even though we know that we can > fail the components right away (as it is the case for the {{TaskExectuor}}, > for example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595452#comment-15595452 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); --- End diff -- We probably want to set this to null again to work with the `isValid` method (if we want to support null values for UUIDs). I would rather not allow null values at all. > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the {{ResourceManager}} blocks > threads in the {{RpcService.execute}} pool. This is not ideal and can be > avoided by not waiting on a {{Future}} in this call. > I propose to encapsulate the leader id retrieval operation in a distinct > service so that it can be separated from the {{ResourceManager}}. This will > reduce the complexity of the {{ResourceManager}} and make the individual > components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); + return FlinkCompletableFuture.completedExceptionally(exception); + } + } - checkNotNull(jobMasterAddress); - checkNotNull(jobID); + log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + + Future jobLeaderIdFuture; - // create a leader retriever in case it doesn't exist - final JobIdLeaderListener jobIdLeaderListener; - if (leaderListeners.containsKey(jobID)) { - jobIdLeaderListener = leaderListeners.get(jobID); - } else { try { - LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); - jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); + jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e); + // we cannot check the job leader id so let's fail + // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id + ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + + "job leader id future to verify the correct job leader.", e); + + onFatalErrorAsync(exception); - return FlinkCompletableFuture.completed( - new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever")); + log.debug("Could not obtain the job leader id future to verify the correct job leader."); + return FlinkCompletableFuture.completedExceptionally(exception); } - leaderListeners.put(jobID, jobIdLeaderListener); - } + Future jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class); - return getRpcService() - .execute(new Callable() { + Future
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595448#comment-15595448 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501742 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); + } + + /** +* Disconnects the job manager which is connected for the given job from the resource manager. +* +* @param jobId identifying the job whose leader shall be disconnected +*/ + protected void disconnectJobManager(JobID jobId, Exception cause) { + JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId); + + if (jobManagerRegistration != null) { + log.info("Disconnect job manager {}@{} for job {} from the resource manager.", + jobManagerRegistration.getLeaderID(), + jobManagerRegistration.getJobManagerGateway().getAddress(), + jobId); + + JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway(); + + // tell the job manager about the disconnect + jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause); + } else { + log.debug("There was no registered job manager for job {}.", jobId); + } + } + + /** +* Checks whether the given resource manager leader id is matching the current leader id. +* +* @param resourceManagerLeaderId to check +* @return True if the given leader id matches the actual leader id; otherwise false +*/ + protected boolean isValid(UUID resourceManagerLeaderId) { + if (resourceManagerLeaderId == null) { + return leaderSessionId == null; --- End diff -- Should `null` always return `false` if we assume that we use a default UUID in non high availability mode? > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the {{ResourceManager}} blocks > threads in the {{RpcService.execute}} pool. This is not ideal and can be > avoided by not waiting on a {{Future}} in this call. > I propose to encapsulate the leader id retrieval operation in a distinct > service so that it can be separated from the {{ResourceManager}}. This will > reduce the complexity of the {{ResourceManager}} and make the individual > components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595450#comment-15595450 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500271 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); --- End diff -- Should this be logged on `error` level? > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the {{ResourceManager}} blocks > threads in the {{RpcService.execute}} pool. This is not ideal and can be > avoided by not waiting on a {{Future}} in this call. > I propose to encapsulate the leader id retrieval operation in a distinct > service so that it can be separated from the {{ResourceManager}}. This will > reduce the complexity of the {{ResourceManager}} and make the individual > components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501742 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); + } + + /** +* Disconnects the job manager which is connected for the given job from the resource manager. +* +* @param jobId identifying the job whose leader shall be disconnected +*/ + protected void disconnectJobManager(JobID jobId, Exception cause) { + JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId); + + if (jobManagerRegistration != null) { + log.info("Disconnect job manager {}@{} for job {} from the resource manager.", + jobManagerRegistration.getLeaderID(), + jobManagerRegistration.getJobManagerGateway().getAddress(), + jobId); + + JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway(); + + // tell the job manager about the disconnect + jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause); + } else { + log.debug("There was no registered job manager for job {}.", jobId); + } + } + + /** +* Checks whether the given resource manager leader id is matching the current leader id. +* +* @param resourceManagerLeaderId to check +* @return True if the given leader id matches the actual leader id; otherwise false +*/ + protected boolean isValid(UUID resourceManagerLeaderId) { + if (resourceManagerLeaderId == null) { + return leaderSessionId == null; --- End diff -- Should `null` always return `false` if we assume that we use a default UUID in non high availability mode? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595447#comment-15595447 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500048 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + --- End diff -- Actually, this might happen when the leader id service fails to start. It could be temporary and we might have to introduce some sort of retry rule here. Not in the scope of this PR though. > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the {{ResourceManager}} blocks > threads in the {{RpcService.execute}} pool. This is not ideal and can be > avoided by not waiting on a {{Future}} in this call. > I propose to encapsulate the leader id retrieval operation in a distinct > service so that it can be separated from the {{ResourceManager}}. This will > reduce the complexity of the {{ResourceManager}} and make the individual > components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500048 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + --- End diff -- Actually, this might happen when the leader id service fails to start. It could be temporary and we might have to introduce some sort of retry rule here. Not in the scope of this PR though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2651: [FLINK-4847] Let RpcEndpoint.start/shutDown throw excepti...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2651 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500271 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); --- End diff -- Should this be logged on `error` level? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595453#comment-15595453 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); + return FlinkCompletableFuture.completedExceptionally(exception); + } + } - checkNotNull(jobMasterAddress); - checkNotNull(jobID); + log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + + Future jobLeaderIdFuture; - // create a leader retriever in case it doesn't exist - final JobIdLeaderListener jobIdLeaderListener; - if (leaderListeners.containsKey(jobID)) { - jobIdLeaderListener = leaderListeners.get(jobID); - } else { try { - LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); - jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); + jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e); + // we cannot check the job leader id so let's fail + // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id + ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + + "job leader id future to verify the correct job leader.", e); + + onFatalErrorAsync(exception); - return FlinkCompletableFuture.completed( - new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever")); + log.debug("Could not obtain the job leader id future to verify the correct job leader."); + return FlinkCompletableFuture.completedExceptionally(exception); } - leaderListeners.put(jobID, jobIdLeaderListener); - } +
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84501555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); --- End diff -- We probably want to set this to null again to work with the `isValid` method (if we want to support null values for UUIDs). I would rather not allow null values at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595451#comment-15595451 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500666 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); + return FlinkCompletableFuture.completedExceptionally(exception); + } + } - checkNotNull(jobMasterAddress); - checkNotNull(jobID); + log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + + Future jobLeaderIdFuture; - // create a leader retriever in case it doesn't exist - final JobIdLeaderListener jobIdLeaderListener; - if (leaderListeners.containsKey(jobID)) { - jobIdLeaderListener = leaderListeners.get(jobID); - } else { try { - LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); - jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); + jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e); + // we cannot check the job leader id so let's fail + // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id + ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + + "job leader id future to verify the correct job leader.", e); + + onFatalErrorAsync(exception); --- End diff -- Declining seems ok in this case since the failure might be temporary. > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84500666 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -202,101 +205,125 @@ public void shutDown() throws Exception { // RPC methods // - /** -* Register a {@link JobMaster} at the resource manager. -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader -* @param jobMasterAddressThe address of the JobMaster that registers -* @param jobID The Job ID of the JobMaster that registers -* @return Future registration response -*/ @RpcMethod - public Future registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { + public Future registerJobManager( + final UUID resourceManagerLeaderId, + final UUID jobManagerLeaderId, + final String jobManagerAddress, + final JobID jobId) { + + checkNotNull(resourceManagerLeaderId); + checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerAddress); + checkNotNull(jobId); + + if (isValid(resourceManagerLeaderId)) { + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + // This should actually never happen because, it should always be possible to add a new job + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service. This should never happen.", e); + + onFatalErrorAsync(exception); + + log.debug("Could not add job {} to job leader id service.", jobId, e); + return FlinkCompletableFuture.completedExceptionally(exception); + } + } - checkNotNull(jobMasterAddress); - checkNotNull(jobID); + log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + + Future jobLeaderIdFuture; - // create a leader retriever in case it doesn't exist - final JobIdLeaderListener jobIdLeaderListener; - if (leaderListeners.containsKey(jobID)) { - jobIdLeaderListener = leaderListeners.get(jobID); - } else { try { - LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); - jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); + jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e); + // we cannot check the job leader id so let's fail + // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id + ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + + "job leader id future to verify the correct job leader.", e); + + onFatalErrorAsync(exception); --- End diff -- Declining seems ok in this case since the failure might be temporary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2657: [FLINK-4853] [rm] Harden job manager registration ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84502118 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); + } + + /** +* Disconnects the job manager which is connected for the given job from the resource manager. +* +* @param jobId identifying the job whose leader shall be disconnected +*/ + protected void disconnectJobManager(JobID jobId, Exception cause) { + JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId); + + if (jobManagerRegistration != null) { + log.info("Disconnect job manager {}@{} for job {} from the resource manager.", + jobManagerRegistration.getLeaderID(), + jobManagerRegistration.getJobManagerGateway().getAddress(), + jobId); + + JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway(); + + // tell the job manager about the disconnect + jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause); + } else { + log.debug("There was no registered job manager for job {}.", jobId); + } + } + + /** +* Checks whether the given resource manager leader id is matching the current leader id. +* +* @param resourceManagerLeaderId to check +* @return True if the given leader id matches the actual leader id; otherwise false +*/ + protected boolean isValid(UUID resourceManagerLeaderId) { + if (resourceManagerLeaderId == null) { + return leaderSessionId == null; + } else { + return resourceManagerLeaderId.equals(leaderSessionId); + } + } + + protected void removeJob(JobID jobId) { + try { + jobLeaderIdService.removeJob(jobId); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not remove job " + jobId + '.', e)); --- End diff -- This should not easily fail (e.g. closing a connection to Zookeeper throws an exception). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595449#comment-15595449 ] ASF GitHub Bot commented on FLINK-4853: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2657#discussion_r84502118 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -471,6 +498,149 @@ public void shutDownCluster(final ApplicationStatus finalStatus, final String op } // + // Testing methods + // + + /** +* Gets the leader session id of current resourceManager. +* +* @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. +*/ + @VisibleForTesting + UUID getLeaderSessionId() { + return leaderSessionId; + } + + // + // Internal methods + // + + private void clearState() { + jobManagerRegistrations.clear(); + taskExecutors.clear(); + slotManager.clearState(); + + try { + jobLeaderIdService.clear(); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); + } + + leaderSessionId = new UUID(0, 0); + } + + /** +* Disconnects the job manager which is connected for the given job from the resource manager. +* +* @param jobId identifying the job whose leader shall be disconnected +*/ + protected void disconnectJobManager(JobID jobId, Exception cause) { + JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId); + + if (jobManagerRegistration != null) { + log.info("Disconnect job manager {}@{} for job {} from the resource manager.", + jobManagerRegistration.getLeaderID(), + jobManagerRegistration.getJobManagerGateway().getAddress(), + jobId); + + JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway(); + + // tell the job manager about the disconnect + jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause); + } else { + log.debug("There was no registered job manager for job {}.", jobId); + } + } + + /** +* Checks whether the given resource manager leader id is matching the current leader id. +* +* @param resourceManagerLeaderId to check +* @return True if the given leader id matches the actual leader id; otherwise false +*/ + protected boolean isValid(UUID resourceManagerLeaderId) { + if (resourceManagerLeaderId == null) { + return leaderSessionId == null; + } else { + return resourceManagerLeaderId.equals(leaderSessionId); + } + } + + protected void removeJob(JobID jobId) { + try { + jobLeaderIdService.removeJob(jobId); + } catch (Exception e) { + onFatalError(new ResourceManagerException("Could not remove job " + jobId + '.', e)); --- End diff -- This should not easily fail (e.g. closing a connection to Zookeeper throws an exception). > Clean up JobManager registration at the ResourceManager > --- > > Key: FLINK-4853 > URL: https://issues.apache.org/jira/browse/FLINK-4853 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The current {{JobManager}} registration at the {{ResourceManager}} blocks > threads in the {{RpcService.execute}} pool. This is not ideal and can be > avoided by not waiting on a {{Future}} in this call. > I propose to encapsulate the leader id retrieval operation in a distinct > service so that it can be separated from the {{ResourceManager}}. This will > reduce the complexity of the {{ResourceManager}} and make the individual > components easier to test.
[jira] [Assigned] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Shoshin reassigned FLINK-4541: Assignee: Alexander Shoshin > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2118) Table API fails on composite filter conditions
[ https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595359#comment-15595359 ] Alexander Shoshin commented on FLINK-2118: -- Fabian, thank you for the fast responce! > Table API fails on composite filter conditions > -- > > Key: FLINK-2118 > URL: https://issues.apache.org/jira/browse/FLINK-2118 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Alexander Shoshin > > Having a composite filter conditions such as > {code} > myTable.filter('name !== "Pete" && 'name !== "Bob") > {code} > fails with the following error message: > {code} > ExpressionException: Non-boolean operand types String and String in Pete && > 'name > {code} > whereas > {code} > myTable.filter( ('name !== "Pete") && ('name !== "Bob") ) > {code} > works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2678: [FLINK-4879] [Kafka-Connector] class KafkaTableSource sho...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2678 I think it's fine to make `KafkaTableSource` public. However, I think users usually want to go for a version-specific `KafkaTableSource` like `Kafka090TableSource`. The version-specific sources are all declared `public`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
[ https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595338#comment-15595338 ] ASF GitHub Bot commented on FLINK-4879: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2678 I think it's fine to make `KafkaTableSource` public. However, I think users usually want to go for a version-specific `KafkaTableSource` like `Kafka090TableSource`. The version-specific sources are all declared `public`. > class KafkaTableSource should be public just like KafkaTableSink > > > Key: FLINK-4879 > URL: https://issues.apache.org/jira/browse/FLINK-4879 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API & SQL >Affects Versions: 1.1.1, 1.1.3 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.4 > > Attachments: 0001-class-KafkaTableSource-should-be-public.patch > > > *class KafkaTableSource should be public just like KafkaTableSink,by > default,it's modifier is default ,and we cann't access out of it's package*, > for example: > {code} > def createKafkaTableSource( > topic: String, > properties: Properties, > deserializationSchema: DeserializationSchema[Row], > fieldsNames: Array[String], > typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { > if (deserializationSchema != null) { > new Kafka09TableSource(topic, properties, deserializationSchema, > fieldsNames, typeInfo) > } else { > new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) > } > } > {code} > Because of the class KafkaTableSource modifier is default,we cann't define > this function result type with KafkaTableSource ,we must give the specific > type. > if some other kafka source extends KafkaTableSource ,and we don't sure which > subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2118) Table API fails on composite filter conditions
[ https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-2118. Resolution: Not A Bug Hi Alexander, I agree and will close the issue. Thanks for looking into this! Best, Fabian > Table API fails on composite filter conditions > -- > > Key: FLINK-2118 > URL: https://issues.apache.org/jira/browse/FLINK-2118 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Alexander Shoshin > > Having a composite filter conditions such as > {code} > myTable.filter('name !== "Pete" && 'name !== "Bob") > {code} > fails with the following error message: > {code} > ExpressionException: Non-boolean operand types String and String in Pete && > 'name > {code} > whereas > {code} > myTable.filter( ('name !== "Pete") && ('name !== "Bob") ) > {code} > works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects
Stefan Richter created FLINK-4883: - Summary: Prevent UDFs implementations through Scala singleton objects Key: FLINK-4883 URL: https://issues.apache.org/jira/browse/FLINK-4883 Project: Flink Issue Type: Bug Reporter: Stefan Richter Currently, user can create and use UDFs in Scala like this: {code} object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] { ... } {code} However, this leads to problems as the UDF is now a singleton that Flink could use across several operator instances, which leads to job failures. We should detect and prevent the usage of singleton UDFs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4876) Allow web interface to be bound to a specific ip/interface/inetHost
[ https://issues.apache.org/jira/browse/FLINK-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595160#comment-15595160 ] ASF GitHub Bot commented on FLINK-4876: --- GitHub user attachmentgenie opened a pull request: https://github.com/apache/flink/pull/2680 [FLINK-4876] Allow web interface to be bound to a specific ip/interface/inetHost Currently the web interface automatically binds to all interfaces on 0.0.0.0. IMHO there are some use cases to only bind to a specific ipadress, (e.g. access through an authenticated proxy, not binding on the management or backup interface) - [x ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/attachmentgenie/flink FLINK-4876 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2680.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2680 > Allow web interface to be bound to a specific ip/interface/inetHost > --- > > Key: FLINK-4876 > URL: https://issues.apache.org/jira/browse/FLINK-4876 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.2.0, 1.1.2, 1.1.3 >Reporter: Bram Vogelaar >Priority: Minor > > Currently the web interface automatically binds to all interfaces on 0.0.0.0. > IMHO there are some use cases to only bind to a specific ipadress, (e.g. > access through an authenticated proxy, not binding on the management or > backup interface) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2680: [FLINK-4876] Allow web interface to be bound to a ...
GitHub user attachmentgenie opened a pull request: https://github.com/apache/flink/pull/2680 [FLINK-4876] Allow web interface to be bound to a specific ip/interface/inetHost Currently the web interface automatically binds to all interfaces on 0.0.0.0. IMHO there are some use cases to only bind to a specific ipadress, (e.g. access through an authenticated proxy, not binding on the management or backup interface) - [x ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/attachmentgenie/flink FLINK-4876 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2680.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2680 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4882) Cleanup throws exception clause in HighAvailabilityServices
[ https://issues.apache.org/jira/browse/FLINK-4882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595144#comment-15595144 ] ASF GitHub Bot commented on FLINK-4882: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2679 [FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServiceswhere not necessary Cleanup of the interface `HighAvailabilityServices` so that only methods which really throw an exception have an exception clause defined. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink removeExceptionsHAServices Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2679 > Cleanup throws exception clause in HighAvailabilityServices > --- > > Key: FLINK-4882 > URL: https://issues.apache.org/jira/browse/FLINK-4882 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > The {{HighAvailabilityServices}} interfaces defines methods with throws > exception clauses which are not really needed. We should remove them to > correct the interface. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2679: [FLINK-4882] [flip-6] Remove exceptions from HighA...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2679 [FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServiceswhere not necessary Cleanup of the interface `HighAvailabilityServices` so that only methods which really throw an exception have an exception clause defined. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink removeExceptionsHAServices Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2679 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-3695) ValueArray types
[ https://issues.apache.org/jira/browse/FLINK-3695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595078#comment-15595078 ] Greg Hogan edited comment on FLINK-3695 at 10/21/16 1:39 PM: - I was thinking this could be moved to the Gelly library since we no longer require integration into the {{TypeExtractor}}; however, we still need the {{Value}} types to map to the proper {{ValueArray}} type. was (Author: greghogan): I was thinking this could be moved to the Gelly library since we no longer require integration into the `TypeExtractor`; however, we still need the `Value` types to map to their own `ValueArray` type. > ValueArray types > > > Key: FLINK-3695 > URL: https://issues.apache.org/jira/browse/FLINK-3695 > Project: Flink > Issue Type: New Feature > Components: Type Serialization System >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Flink provides mutable {{Value}} type implementations of Java primitives > along with efficient serializers and comparators. It would be useful to have > corresponding {{ValueArray}} implementations backed by primitive rather than > object arrays, along with an {{ArrayableValue}} interface tying a {{Value}} > to its {{ValueArray}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3695) ValueArray types
[ https://issues.apache.org/jira/browse/FLINK-3695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595078#comment-15595078 ] Greg Hogan edited comment on FLINK-3695 at 10/21/16 1:34 PM: - I was thinking this could be moved to the Gelly library since we no longer require integration into the `TypeExtractor`; however, we still need the `Value` types to map to their own `ValueArray` type. was (Author: greghogan): I have changed this ticket's component to "gelly" since with FLINK-3042 we no longer require integration into core Flink and can incubate this in the library. > ValueArray types > > > Key: FLINK-3695 > URL: https://issues.apache.org/jira/browse/FLINK-3695 > Project: Flink > Issue Type: New Feature > Components: Type Serialization System >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Flink provides mutable {{Value}} type implementations of Java primitives > along with efficient serializers and comparators. It would be useful to have > corresponding {{ValueArray}} implementations backed by primitive rather than > object arrays, along with an {{ArrayableValue}} interface tying a {{Value}} > to its {{ValueArray}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84463868 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -99,8 +111,12 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(W window, OnMergeContext ctx) { + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + } --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4773) Introduce an OperatorIOMetricGroup
[ https://issues.apache.org/jira/browse/FLINK-4773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595082#comment-15595082 ] ASF GitHub Bot commented on FLINK-4773: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2614 Iv'e rebased the branch and will merge it once travis passes. > Introduce an OperatorIOMetricGroup > -- > > Key: FLINK-4773 > URL: https://issues.apache.org/jira/browse/FLINK-4773 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.2 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.2.0 > > > Task related IO metrics (numBytesIn/Out) are not instantiated directly by the > task, but instead within the IOMetricGroup contained in the respective > TaskMetricGroup. They are then later accessed by relevant components, instead > of creating them themselves. This has the advantage that they can be accessed > from several places, and that they are guaranteed to always be instantiated > identically (without requiring static name constants). > I propose to do the same for operators. > This is also a prerequisite for FLINK-4733. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2667: README.md - Description of the bluemix specif…
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2667 Thank you for your contribution! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2614: [FLINK-4773] Intoduce OperatorIOMetricGroup
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2614 Iv'e rebased the branch and will merge it once travis passes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2667 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3695) ValueArray types
[ https://issues.apache.org/jira/browse/FLINK-3695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3695: -- Component/s: (was: Gelly) Type Serialization System > ValueArray types > > > Key: FLINK-3695 > URL: https://issues.apache.org/jira/browse/FLINK-3695 > Project: Flink > Issue Type: New Feature > Components: Type Serialization System >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Flink provides mutable {{Value}} type implementations of Java primitives > along with efficient serializers and comparators. It would be useful to have > corresponding {{ValueArray}} implementations backed by primitive rather than > object arrays, along with an {{ArrayableValue}} interface tying a {{Value}} > to its {{ValueArray}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3695) ValueArray types
[ https://issues.apache.org/jira/browse/FLINK-3695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595078#comment-15595078 ] Greg Hogan commented on FLINK-3695: --- I have changed this ticket's component to "gelly" since with FLINK-3042 we no longer require integration into core Flink and can incubate this in the library. > ValueArray types > > > Key: FLINK-3695 > URL: https://issues.apache.org/jira/browse/FLINK-3695 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Flink provides mutable {{Value}} type implementations of Java primitives > along with efficient serializers and comparators. It would be useful to have > corresponding {{ValueArray}} implementations backed by primitive rather than > object arrays, along with an {{ArrayableValue}} interface tying a {{Value}} > to its {{ValueArray}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4864) Shade Calcite dependency in flink-table
[ https://issues.apache.org/jira/browse/FLINK-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595076#comment-15595076 ] ASF GitHub Bot commented on FLINK-4864: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2673 Thanks for the PR @wuchong. I tried to build a Table API program with the flink-table dependency, but the compilation failed with the following error message: > Caused by: scala.reflect.internal.Types$TypeError: bad symbolic reference. A signature in TableEnvironment.class refers to term parser in value org.apache.sql which is not available. I'm not sure what exactly is going wrong, but it looks like the package `org.apache.sql` is not correct. It should probably be `org.apache.calcite.sql` which has a `parser` subpackage. Thanks, Fabian > Shade Calcite dependency in flink-table > --- > > Key: FLINK-4864 > URL: https://issues.apache.org/jira/browse/FLINK-4864 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > The Table API has a dependency on Apache Calcite. > A user reported to have version conflicts when having a own Calcite > dependency in the classpath. > The solution would be to shade away the Calcite dependency (Calcite's > transitive dependencies are already shaded). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2673: [FLINK-4864] [table] Shade Calcite dependency in flink-ta...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2673 Thanks for the PR @wuchong. I tried to build a Table API program with the flink-table dependency, but the compilation failed with the following error message: > Caused by: scala.reflect.internal.Types$TypeError: bad symbolic reference. A signature in TableEnvironment.class refers to term parser in value org.apache.sql which is not available. I'm not sure what exactly is going wrong, but it looks like the package `org.apache.sql` is not correct. It should probably be `org.apache.calcite.sql` which has a `parser` subpackage. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4875) operator name not correctly inferred
[ https://issues.apache.org/jira/browse/FLINK-4875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595071#comment-15595071 ] ASF GitHub Bot commented on FLINK-4875: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2676 +1 to merge this change. > operator name not correctly inferred > > > Key: FLINK-4875 > URL: https://issues.apache.org/jira/browse/FLINK-4875 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.2 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2676: [FLINK-4875] [metrics] Use correct operator name
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2676 +1 to merge this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3695) ValueArray types
[ https://issues.apache.org/jira/browse/FLINK-3695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3695: -- Component/s: (was: Type Serialization System) Gelly > ValueArray types > > > Key: FLINK-3695 > URL: https://issues.apache.org/jira/browse/FLINK-3695 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Flink provides mutable {{Value}} type implementations of Java primitives > along with efficient serializers and comparators. It would be useful to have > corresponding {{ValueArray}} implementations backed by primitive rather than > object arrays, along with an {{ArrayableValue}} interface tying a {{Value}} > to its {{ValueArray}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2668: Add EvaluateDataSetOperation for LabeledVector. This clos...
Github user thvasilo commented on the issue: https://github.com/apache/flink/pull/2668 Hello Thomas, thank you for your contribution! I took a brief look so some initial comments: This seems to be making changes to `MLUtils` which AFAIK is outside the scope of this issue. I would recommend you isolate changes into different issues and PRs. I also see a lot of style changes to existing code. The code style we try to follow is [this one](https://github.com/databricks/scala-style-guide), I would recommend you review that and try to follow it. As a rule of thumb we don't make style changes to existing code, unless the existing code does not conform to the linked style. Even in that case I would recommend opening a different PR with only style changes, as it makes reviewing the core PR (which is the added code here) easier. So I'd recommend to remove the style changes you've made from this PR as well. If there is existing code that violates the linked style we can open a new PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2614: [FLINK-4773] Intoduce OperatorIOMetricGroup
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2614 I think this is a good refactoring. +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2671: [FLINK-4862] fix Timer register in ContinuousEventTimeTri...
Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/2671 @mxm updated. Thanks for teaching me more about the internals. Ignoring the old Timer make things much simpler actually. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4862) NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger
[ https://issues.apache.org/jira/browse/FLINK-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595061#comment-15595061 ] ASF GitHub Bot commented on FLINK-4862: --- Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/2671 @mxm updated. Thanks for teaching me more about the internals. Ignoring the old Timer make things much simpler actually. > NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger > -- > > Key: FLINK-4862 > URL: https://issues.apache.org/jira/browse/FLINK-4862 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Affects Versions: 1.2.0, 1.1.3 >Reporter: Manu Zhang >Assignee: Manu Zhang > Fix For: 1.2.0, 1.1.4 > > > h3. what's the error ? > The following NPE error is thrown when EventTimeSessionWindows with > ContinuousEventTimeTrigger is used. > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.clear(ContinuousEventTimeTrigger.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:768) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:310) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:196) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:271) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609) > at java.lang.Thread.run(Thread.java:745) > {code} > h3. how to reproduce ? > use {{ContinuousEventTimeTrigger}} instead of the default > {{EventTimeTrigger}} in [SessionWindowing | > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java#L84] > example. > h3. what's the cause ? > When two session windows are being merged, the states of the two > {{ContinuousEventTimeTrigger}} are merged as well and the new namespace is > the merged window. Later when the context tries to delete {{Timer}} from the > old trigger and looks up the timestamp by the old namespace, null value is > returned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84465826 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); --- End diff -- The above looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4862) NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger
[ https://issues.apache.org/jira/browse/FLINK-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595036#comment-15595036 ] ASF GitHub Bot commented on FLINK-4862: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84465826 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); --- End diff -- The above looks good. > NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger > -- > > Key: FLINK-4862 > URL: https://issues.apache.org/jira/browse/FLINK-4862 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Affects Versions: 1.2.0, 1.1.3 >Reporter: Manu Zhang >Assignee: Manu Zhang > Fix For: 1.2.0, 1.1.4 > > > h3. what's the error ? > The following NPE error is thrown when EventTimeSessionWindows with > ContinuousEventTimeTrigger is used. > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.clear(ContinuousEventTimeTrigger.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:768) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:310) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:196) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:271) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609) > at java.lang.Thread.run(Thread.java:745) > {code} > h3. how to reproduce ? > use {{ContinuousEventTimeTrigger}} instead of the default > {{EventTimeTrigger}} in [SessionWindowing | > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java#L84] > example. > h3. what's the cause ? > When two session windows are being merged, the states of the two > {{ContinuousEventTimeTrigger}} are merged as well and the new namespace is > the merged window. Later when the context tries to delete {{Timer}} from the > old trigger and looks up the timestamp by the old namespace, null value is > returned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4563) [metrics] scope caching not adjusted for multiple reporters
[ https://issues.apache.org/jira/browse/FLINK-4563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595056#comment-15595056 ] ASF GitHub Bot commented on FLINK-4563: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2650 correct; we could also use a `String[]` and `CharacterFilter[]`. > [metrics] scope caching not adjusted for multiple reporters > --- > > Key: FLINK-4563 > URL: https://issues.apache.org/jira/browse/FLINK-4563 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Anton Mushin > > Every metric group contains a scope string, representing what entities > (job/task/etc.) a given metric belongs to, which is calculated on demand. > Before this string is cached a CharacterFilter is applied to it, which is > provided by the callee, usually a reporter. This was done since different > reporters have different requirements in regards to valid characters. The > filtered string is cached so that we don't have to refilter the string every > time. > This all works fine with a single reporter; with multiple however it is > completely broken as only the first filter is ever applied. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2650: [FLINK-4563] [metrics] scope caching not adjusted for mul...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2650 correct; we could also use a `String[]` and `CharacterFilter[]`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4833) Unstable test OperatorStatsAccumulatorTest.testAccumulatorHeavyHitterCountMinSketch
[ https://issues.apache.org/jira/browse/FLINK-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595040#comment-15595040 ] ASF GitHub Bot commented on FLINK-4833: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2660 > Unstable test > OperatorStatsAccumulatorTest.testAccumulatorHeavyHitterCountMinSketch > --- > > Key: FLINK-4833 > URL: https://issues.apache.org/jira/browse/FLINK-4833 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Kostas Kloudas > Labels: test-stability > > Some instances: > view-source:https://s3.amazonaws.com/archive.travis-ci.org/jobs/167801187/log.txt > view-source:https://s3.amazonaws.com/archive.travis-ci.org/jobs/167801191/log.txt > view-source:https://s3.amazonaws.com/archive.travis-ci.org/jobs/167801193/log.txt > view-source:https://s3.amazonaws.com/archive.travis-ci.org/jobs/167801195/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2660: [FLINK-4833] properly log exceptions in CountMinHe...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2660 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2615: [FLINK-4772] [FLINK-4775] Metric Store enhancement...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/2615 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4772) Store metrics in MetricStore as strings
[ https://issues.apache.org/jira/browse/FLINK-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595041#comment-15595041 ] ASF GitHub Bot commented on FLINK-4772: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/2615 > Store metrics in MetricStore as strings > --- > > Key: FLINK-4772 > URL: https://issues.apache.org/jira/browse/FLINK-4772 > Project: Flink > Issue Type: Improvement > Components: Metrics, Webfrontend >Affects Versions: 1.1.2 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.2.0 > > > The MetricStore is a nested structure which stores metrics in several > Mapobjects. The key is the name of the metric, while the > value is the value of the metric. Since gauges are transmitted as Strings, > and all other metric types are of numeric nature, the type of the value is > limited to Strings and Numbers. > Storing them with different types however has no benefit, and makes working > with the MetricStore needlessly complicated since one is forced to cast every > retrieved metric. This also implies that one either has to know what kind of > metric you're dealing with (which can easily go wrong), or check the type > before using the metric, which is cumbersome. > As such i propose storing all metrics as strings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84465211 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } else if (cachedFireTimestamp != null){ + ctx.deleteEventTimeTimer(cachedFireTimestamp); + } --- End diff -- I think we're fine with not doing anything when `timestamp == null`. The old timer won't influence the newly merged window. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4862) NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger
[ https://issues.apache.org/jira/browse/FLINK-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595033#comment-15595033 ] ASF GitHub Bot commented on FLINK-4862: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84464167 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } else if (cachedFireTimestamp != null){ + ctx.deleteEventTimeTimer(cachedFireTimestamp); + } --- End diff -- The above `else if` block is not correct because there is only one instance of the trigger which is reused for each Window. Hence the abstraction using the state descriptor to retrieve the appropriate state. > NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger > -- > > Key: FLINK-4862 > URL: https://issues.apache.org/jira/browse/FLINK-4862 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Affects Versions: 1.2.0, 1.1.3 >Reporter: Manu Zhang >Assignee: Manu Zhang > Fix For: 1.2.0, 1.1.4 > > > h3. what's the error ? > The following NPE error is thrown when EventTimeSessionWindows with > ContinuousEventTimeTrigger is used. > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.clear(ContinuousEventTimeTrigger.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:768) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:310) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:196) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:271) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609) > at java.lang.Thread.run(Thread.java:745) > {code} > h3. how to reproduce ? > use {{ContinuousEventTimeTrigger}} instead of the default > {{EventTimeTrigger}} in [SessionWindowing | > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java#L84] > example. > h3. what's the cause ? > When two session windows are being merged, the states of the two > {{ContinuousEventTimeTrigger}} are merged as well and the new namespace is > the merged window. Later when the context tries to delete {{Timer}} from the > old trigger and looks up the timestamp by the old namespace, null value is > returned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4862) NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger
[ https://issues.apache.org/jira/browse/FLINK-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595034#comment-15595034 ] ASF GitHub Bot commented on FLINK-4862: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84465211 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } else if (cachedFireTimestamp != null){ + ctx.deleteEventTimeTimer(cachedFireTimestamp); + } --- End diff -- I think we're fine with not doing anything when `timestamp == null`. The old timer won't influence the newly merged window. > NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger > -- > > Key: FLINK-4862 > URL: https://issues.apache.org/jira/browse/FLINK-4862 > Project: Flink > Issue Type: Bug > Components: Streaming, Windowing Operators >Affects Versions: 1.2.0, 1.1.3 >Reporter: Manu Zhang >Assignee: Manu Zhang > Fix For: 1.2.0, 1.1.4 > > > h3. what's the error ? > The following NPE error is thrown when EventTimeSessionWindows with > ContinuousEventTimeTrigger is used. > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.clear(ContinuousEventTimeTrigger.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:768) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:310) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:196) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:297) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:271) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609) > at java.lang.Thread.run(Thread.java:745) > {code} > h3. how to reproduce ? > use {{ContinuousEventTimeTrigger}} instead of the default > {{EventTimeTrigger}} in [SessionWindowing | > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java#L84] > example. > h3. what's the cause ? > When two session windows are being merged, the states of the two > {{ContinuousEventTimeTrigger}} are merged as well and the new namespace is > the merged window. Later when the context tries to delete {{Timer}} from the > old trigger and looks up the timestamp by the old namespace, null value is > returned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84464275 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -45,6 +45,12 @@ private final ReducingStateDescriptor stateDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE); + /** +* Used to preserve the fire timestamp before merge such that +* the corresponding timer could be cleared after merge +*/ + private Long cachedFireTimestamp = null; --- End diff -- This doesn't work because there is only one `Trigger` instance and this will potentially be overwritten by many Windows being merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84464167 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java --- @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteEventTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } else if (cachedFireTimestamp != null){ + ctx.deleteEventTimeTimer(cachedFireTimestamp); + } --- End diff -- The above `else if` block is not correct because there is only one instance of the trigger which is reused for each Window. Hence the abstraction using the state descriptor to retrieve the appropriate state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4743) The sqrt/power function not accept the real data types.
[ https://issues.apache.org/jira/browse/FLINK-4743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595029#comment-15595029 ] Fabian Hueske commented on FLINK-4743: -- Hi [~tonycox], Thanks for looking into this issue. Timo is currently on vacation and will return in about a month. I have a few comments on the branch you posted: - I think you can use {{ScalarOperators.generateCast()}} for the casting - You register in {{ScalarFunctions}} you register {{POWER, Seq(DOUBLE_TYPE_INFO, BIG_DEC_TYPE_INFO)}} twice - We try to keep the number of time consuming ITCases low. Please convert your tests to extend {{ExpressionTestBase}} After that you should be good to open a PR. Thanks, Fabian > The sqrt/power function not accept the real data types. > --- > > Key: FLINK-4743 > URL: https://issues.apache.org/jira/browse/FLINK-4743 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.1 >Reporter: Anton Mushin >Assignee: Anton Solovev > > At time calculate the sequences sql aggregate functions for real type column, > got exception: No applicable constructor/method found for actual parameters > "float, java.math.BigDecimal" > And for column of integer type the problem does not occur. > Code reproduce: > {code} > @Test > def test():Unit={ > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > val ds = env.fromElements( > (1.0f, 1), > (2.0f, 2)).toTable(tEnv) > tEnv.registerTable("MyTable", ds) > val sqlQuery = "SELECT " + > "SQRT((SUM(a*a) - SUM(a)*SUM(a)/COUNT(a))/COUNT(a)) "+ > "from (select _1 as a from MyTable)" > tEnv.sql(sqlQuery).toDataSet[Row].collect().foreach(x=>print(s"$x ")) > } > {code} > got exception: > {noformat} > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.api.table.runtime.FunctionCompiler$class.compile(FunctionCompiler.scala:37) > at > org.apache.flink.api.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:28) > at > org.apache.flink.api.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:42) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > at > org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:249) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:147) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:129) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:180) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:156) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:129) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:113) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > at > org.apache.flink.api.scala.batch.sql.AggregationsITCase.test(AggregationsITCase.scala:307) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at >
[jira] [Updated] (FLINK-4874) Document how to enable Flink web interface in local execution
[ https://issues.apache.org/jira/browse/FLINK-4874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishna Prasad Anna Ramesh Kumar updated FLINK-4874: Description: The local environment section in the local execution web page (https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/local_execution.html) indicates that in the web interface cannot be started while running in a local environment. As Till has pointed out in one of the mailing lists topics, this can be done by including the following in the program code. "Configuration config = new Configuration(); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(getP, config);" And adding this dependency " org.apache.flink flink-runtime-web_2.10 ${flink.version} " This should be added to the documentation as it very critical for developers like time who are trying to learn the framework. Thanks! was: The local environment section in the local execution web page (https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/local_execution.html) indicates that in the web interface cannot be started while running in a local environment. As Till has pointed out in one of the mailing lists topics, this can be done by including the following in the program code. "Configuration config = new Configuration(); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(getP, config);" And adding this dependency " org.apache.flink flink-runtime-web_2.10 ${flink.version} " This should be added to the documentation as it very critical for developers like time who are trying to learning the framework. Thanks! > Document how to enable Flink web interface in local execution > - > > Key: FLINK-4874 > URL: https://issues.apache.org/jira/browse/FLINK-4874 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.2.0, 1.1.2 >Reporter: Krishna Prasad Anna Ramesh Kumar >Priority: Trivial > Fix For: 1.1.2 > > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > The local environment section in the local execution web page > (https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/local_execution.html) > indicates that in the web interface cannot be started while running in a > local environment. As Till has pointed out in one of the mailing lists > topics, this can be done by including the following in the program code. > "Configuration config = new Configuration(); > config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(getP, config);" > And adding this dependency > " > org.apache.flink > flink-runtime-web_2.10 > ${flink.version} > " > This should be added to the documentation as it very critical for developers > like time who are trying to learn the framework. > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2118) Table API fails on composite filter conditions
[ https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15595011#comment-15595011 ] Alexander Shoshin commented on FLINK-2118: -- I agree with [~RenkaiGe]. {{'name \!== "Pete" && 'name \!== "Bob"}} expression will not work correctly without brackets because scala marks {{\!==}} as assignment operator which has the lowest compare to other operators precedence. At the moment scala hasn't an opportunity to set the infix operations presedence manualy. The only way is to change {{\!==}} to something like {{<>}} or {{=\!=}} as written in previous comment but it might become not so intuitive. Moreover that will change the public API which is not a good idea. Shouldn't we close this issue? > Table API fails on composite filter conditions > -- > > Key: FLINK-2118 > URL: https://issues.apache.org/jira/browse/FLINK-2118 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Alexander Shoshin > > Having a composite filter conditions such as > {code} > myTable.filter('name !== "Pete" && 'name !== "Bob") > {code} > fails with the following error message: > {code} > ExpressionException: Non-boolean operand types String and String in Pete && > 'name > {code} > whereas > {code} > myTable.filter( ('name !== "Pete") && ('name !== "Bob") ) > {code} > works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4882) Cleanup throws exception clause in HighAvailabilityServices
Till Rohrmann created FLINK-4882: Summary: Cleanup throws exception clause in HighAvailabilityServices Key: FLINK-4882 URL: https://issues.apache.org/jira/browse/FLINK-4882 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor The {{HighAvailabilityServices}} interfaces defines methods with throws exception clauses which are not really needed. We should remove them to correct the interface. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4878) ReduceTaskExternalITCase.testMultiLevelMergeCombiningReduceTask test instable
[ https://issues.apache.org/jira/browse/FLINK-4878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15594938#comment-15594938 ] Robert Metzger commented on FLINK-4878: --- Fixed logging, so that we can understand the issue better in the future: https://github.com/apache/flink/commit/7398fdbfe024652b4299b582c2e1559da473011d > ReduceTaskExternalITCase.testMultiLevelMergeCombiningReduceTask test instable > - > > Key: FLINK-4878 > URL: https://issues.apache.org/jira/browse/FLINK-4878 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Robert Metzger > Labels: test-stability > > The {{ReduceTaskExternalITCase.testMultiLevelMergeCombiningReduceTask}} test > failed with the following error > {code} > testMultiLevelMergeCombiningReduceTask[0](org.apache.flink.runtime.operators.ReduceTaskExternalITCase) > Time elapsed: 0.233 sec <<< FAILURE! > java.lang.AssertionError: Invoke method caused exception. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.runtime.operators.ReduceTaskExternalITCase.testMultiLevelMergeCombiningReduceTask(ReduceTaskExternalITCase.java:193) > {code} > Here is the log > https://s3.amazonaws.com/archive.travis-ci.org/jobs/169181202/log.txt > Sadly, the test is not logging the exception at INFO level, so we can not > really understand what's going on. > I'll push a commit improving the logging. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4881) Docker: Remove dependency on shared volumes
[ https://issues.apache.org/jira/browse/FLINK-4881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed FLINK-4881. - Resolution: Fixed Merged with 58a16c1007c97809bdd46c93632abd90c0ce5c4b. > Docker: Remove dependency on shared volumes > --- > > Key: FLINK-4881 > URL: https://issues.apache.org/jira/browse/FLINK-4881 > Project: Flink > Issue Type: Improvement > Components: Docker >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.2.0 > > > Our Dockerfile assumes a shared volume configuration to access the config. > Instead, we can configure the Docker container to directly write the hostname > into {{/etc/hosts}} and use "jobmanager" as the default hostname. > This has been proposed here: https://github.com/apache/flink/pull/2667 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4780) Ability to use UDP protocol in metrics-graphite
[ https://issues.apache.org/jira/browse/FLINK-4780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15594928#comment-15594928 ] ASF GitHub Bot commented on FLINK-4780: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2677 > Ability to use UDP protocol in metrics-graphite > --- > > Key: FLINK-4780 > URL: https://issues.apache.org/jira/browse/FLINK-4780 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.2 >Reporter: Maciej Prochniak >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.2.0 > > > GraphiteSender in dropwizard metrics has ability to send metrics with TCP or > UDP sockets. I'd like to be able to configure that, as UDP is our default > protocol to interact with influxdb and TCP version has some problems when > restarting metrics database -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2677: [FLINK-4780] Make GraphiteReporter protocol config...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2677 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4881) Docker: Remove dependency on shared volumes
Maximilian Michels created FLINK-4881: - Summary: Docker: Remove dependency on shared volumes Key: FLINK-4881 URL: https://issues.apache.org/jira/browse/FLINK-4881 Project: Flink Issue Type: Improvement Components: Docker Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Minor Fix For: 1.2.0 Our Dockerfile assumes a shared volume configuration to access the config. Instead, we can configure the Docker container to directly write the hostname into {{/etc/hosts}} and use "jobmanager" as the default hostname. This has been proposed here: https://github.com/apache/flink/pull/2667 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4623) Create Physical Execution Plan of a DataStream
[ https://issues.apache.org/jira/browse/FLINK-4623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev reassigned FLINK-4623: Assignee: Anton Solovev > Create Physical Execution Plan of a DataStream > -- > > Key: FLINK-4623 > URL: https://issues.apache.org/jira/browse/FLINK-4623 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Solovev > Labels: starter > > The {{StreamTableEnvironment#explain(Table)}} command for tables of a > {{StreamTableEnvironment}} only outputs the abstract syntax tree. It would be > helpful if the {{explain}} method could also generate a string from the > {{DataStream}} containing a physical execution plan. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4880) FlinkML - Implement Feature hashing (Data pre-processing)
[ https://issues.apache.org/jira/browse/FLINK-4880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4880: --- Description: I'm going to work on feature hashing, as it is a point mentioned on the FlinkML roadmap: https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap A good starting point: https://github.com/apache/spark/blob/v2.0.1/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala was: I'm going to work on feature hashing, as it is a point mentioned on the FlinkML roadmap: https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap > FlinkML - Implement Feature hashing (Data pre-processing) > - > > Key: FLINK-4880 > URL: https://issues.apache.org/jira/browse/FLINK-4880 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > I'm going to work on feature hashing, as it is a point mentioned on the > FlinkML roadmap: > https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap > A good starting point: > https://github.com/apache/spark/blob/v2.0.1/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)