[jira] [Commented] (FLINK-4222) Allow Kinesis configuration to get credentials from AWS Metadata
[ https://issues.apache.org/jira/browse/FLINK-4222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388967#comment-15388967 ] ASF GitHub Bot commented on FLINK-4222: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2260 Hi @chadnickbok, I've gave the new config a test on AWS EC2 instances with appropriate IAM roles attached, and it works as expected and nicely. I think the changes are good to merge once the remaining comments are addressed :) > Allow Kinesis configuration to get credentials from AWS Metadata > > > Key: FLINK-4222 > URL: https://issues.apache.org/jira/browse/FLINK-4222 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Nick Chadwick >Priority: Minor > Labels: easyfix > Original Estimate: 1h > Remaining Estimate: 1h > > When deploying Flink TaskManagers in an EC2 environment, it would be nice to > be able to use the EC2 IAM Role credentials provided by the EC2 Metadata > service. > This allows for credentials to be automatically discovered by services > running on EC2 instances at runtime, and removes the need to explicitly > create and assign credentials to TaskManagers. > This should be a fairly small change to the configuration of the > flink-connector-kinesis, which will greatly improve the ease of deployment to > Amazon EC2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2260: [FLINK-4222] Allow Kinesis configuration to get credentia...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2260 Hi @chadnickbok, I've gave the new config a test on AWS EC2 instances with appropriate IAM roles attached, and it works as expected and nicely. I think the changes are good to merge once the remaining comments are addressed :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2282: Support order by with offset and fetch.
GitHub user gallenvara opened a pull request: https://github.com/apache/flink/pull/2282 Support order by with offset and fetch. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed Support orderBy with offset and fetch. @twalthr @fhueske Can you help with review and give me some feedback? :) You can merge this pull request into a Git repository by running: $ git pull https://github.com/gallenvara/flink flink-3940 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2282 commit e6f2da3ef8e566942e160b628521ee0f50049fa4 Author: gallenvaraDate: 2016-07-22T03:39:46Z Support order by with offset and fetch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4252) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renkai Ge updated FLINK-4252: - Description: I'm trying the table apis. I got some errors like this My code is in the attachments The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672) at TestMain$.main(TestMain.scala:31) at TestMain.main(TestMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The user defined 'open(Configuration)' method in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47) at org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.api.table.runtime.FunctionCompiler$class.compile(FunctionCompiler.scala:37) at org.apache.flink.api.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:28) at org.apache.flink.api.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:42) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
[jira] [Created] (FLINK-4252) Table program cannot be compiled
Renkai Ge created FLINK-4252: Summary: Table program cannot be compiled Key: FLINK-4252 URL: https://issues.apache.org/jira/browse/FLINK-4252 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.1.0 Environment: OS X EI Captain scala 2.11.7 jdk 8 Reporter: Renkai Ge Attachments: TestMain.scala I'm trying the table apis. I got some errors like this The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672) at TestMain$.main(TestMain.scala:31) at TestMain.main(TestMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The user defined 'open(Configuration)' method in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47) at org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.api.table.runtime.FunctionCompiler$class.compile(FunctionCompiler.scala:37) at
[jira] [Updated] (FLINK-4252) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renkai Ge updated FLINK-4252: - Attachment: TestMain.scala > Table program cannot be compiled > > > Key: FLINK-4252 > URL: https://issues.apache.org/jira/browse/FLINK-4252 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 > Environment: OS X EI Captain > scala 2.11.7 > jdk 8 >Reporter: Renkai Ge > Attachments: TestMain.scala > > > I'm trying the table apis. > I got some errors like this > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672) > at TestMain$.main(TestMain.scala:31) > at TestMain.main(TestMain.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The user defined 'open(Configuration)' method > in class org.apache.flink.api.table.runtime.FlatMapRunner caused an > exception: Table program cannot be compiled. This is a bug. Please file an > issue. > at > org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337) > at > org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47) > at > org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by:
[jira] [Comment Edited] (FLINK-4250) Cannot select other than first column from Table
[ https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388780#comment-15388780 ] Jark Wu edited comment on FLINK-4250 at 7/22/16 2:55 AM: - I think the second exception is similar to the first, because "VALUE" is also a reserved SQL keyword as a part of "[NEXT VALUE FOR|https://msdn.microsoft.com/en-us//library/ff878370.aspx]; which generates a sequence. was (Author: jark): I think the second exception is similar to the first, because "VALUE" is also a reserved SQL keyword as a part of "NEXT VALUE FOR" which generates a sequence. > Cannot select other than first column from Table > > > Key: FLINK-4250 > URL: https://issues.apache.org/jira/browse/FLINK-4250 > Project: Flink > Issue Type: Bug > Components: Scala API, Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > > Using the Scala Table API and the {{CsvTableSource}} I cannot select a column > from the csv source. The following code: > {code} > package com.dataartisans.batch > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.api.table.sources.CsvTableSource > import org.apache.flink.api.table.{Row, TableEnvironment, Table} > object CsvTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val csvFilePath = "table-jobs/src/main/resources/input.csv" > val tblEnv = TableEnvironment.getTableEnvironment(env) > val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", > "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO)) > tblEnv.registerTableSource("foobar", csvTS) > val input = tblEnv.sql("SELECT user FROM foobar") > tblEnv.toDataSet[Row](input).print() > } > } > {code} > fails with > {code} > Exception in thread "main" > org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) > at > org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) > at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4250) Cannot select other than first column from Table
[ https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388780#comment-15388780 ] Jark Wu commented on FLINK-4250: I think the second exception is similar to the first, because "VALUE" is also a reserved SQL keyword as a part of "NEXT VALUE FOR" which generates a sequence. > Cannot select other than first column from Table > > > Key: FLINK-4250 > URL: https://issues.apache.org/jira/browse/FLINK-4250 > Project: Flink > Issue Type: Bug > Components: Scala API, Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > > Using the Scala Table API and the {{CsvTableSource}} I cannot select a column > from the csv source. The following code: > {code} > package com.dataartisans.batch > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.api.table.sources.CsvTableSource > import org.apache.flink.api.table.{Row, TableEnvironment, Table} > object CsvTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val csvFilePath = "table-jobs/src/main/resources/input.csv" > val tblEnv = TableEnvironment.getTableEnvironment(env) > val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", > "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO)) > tblEnv.registerTableSource("foobar", csvTS) > val input = tblEnv.sql("SELECT user FROM foobar") > tblEnv.toDataSet[Row](input).print() > } > } > {code} > fails with > {code} > Exception in thread "main" > org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) > at > org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) > at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH
[ https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388752#comment-15388752 ] GaoLun commented on FLINK-3940: --- Hello, i would like to work on this issue. :) > Add support for ORDER BY OFFSET FETCH > - > > Key: FLINK-3940 > URL: https://issues.apache.org/jira/browse/FLINK-3940 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Priority: Minor > > Currently only ORDER BY without OFFSET and FETCH are supported. > This issue tracks the effort to add support for OFFSET and FETCH and involves: > - Implementing the execution strategy in `DataSetSort` > - adapting the `DataSetSortRule` to support OFFSET and FETCH > - extending the Table API and validation to support OFFSET and FETCH and > generate a corresponding RelNode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4175) Broadcast data sent increases with # slots per TM
[ https://issues.apache.org/jira/browse/FLINK-4175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388545#comment-15388545 ] Felix Neutatz commented on FLINK-4175: -- I started a design document here: https://docs.google.com/document/d/1odYIvmQt4feonQF9q-btBnGvrzzN3lX0Os6rzHcCOjA/edit?usp=sharing I highly appreciate any idea or comment and I am looking forward to the discussion > Broadcast data sent increases with # slots per TM > - > > Key: FLINK-4175 > URL: https://issues.apache.org/jira/browse/FLINK-4175 > Project: Flink > Issue Type: Improvement > Components: Core, TaskManager >Affects Versions: 1.0.3 >Reporter: Felix Neutatz >Assignee: Felix Neutatz > Labels: performance > > Problem: > we experience some unexpected increase of data sent over the network for > broadcasts with increasing number of slots per Taskmanager. > We provided a benchmark [1]. It not only increases the size of data sent over > the network but also hurts performance as seen in the preliminary results > below. In this results cloud-11 has 25 nodes and ibm-power has 8 nodes with > scaling the number of slots per node from 1 - 16. > +---+--+-+ > | suite | name | median_time | > +===+==+=+ > | broadcast.cloud-11| broadcast.01 |8796 | > | broadcast.cloud-11| broadcast.02 | 14802 | > | broadcast.cloud-11| broadcast.04 | 30173 | > | broadcast.cloud-11| broadcast.08 | 56936 | > | broadcast.cloud-11| broadcast.16 | 117507 | > | broadcast.ibm-power-1 | broadcast.01 |6807 | > | broadcast.ibm-power-1 | broadcast.02 |8443 | > | broadcast.ibm-power-1 | broadcast.04 | 11823 | > | broadcast.ibm-power-1 | broadcast.08 | 21655 | > | broadcast.ibm-power-1 | broadcast.16 | 37426 | > +---+--+-+ > After looking into the code base it, it seems that the data is de-serialized > only once per TM, but the actual data is sent for all slots running the > operator with broadcast vars and just gets discarded in case its already > de-serialized. > We do not see a reason the data can't be shared among the slots of a TM and > therefore just sent once. > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast > This Jira will continue the discussion started here: > https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3c1465386300767.94...@tu-berlin.de%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2271: [FLINK-4202] Add restarting time JM metric
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2271 Also, the documentation (list of all exposed metrics) was not updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Reopened] (FLINK-4202) Add JM metric which shows the restart duration
[ https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-4202: - Documentation was not properly updated. > Add JM metric which shows the restart duration > -- > > Key: FLINK-4202 > URL: https://issues.apache.org/jira/browse/FLINK-4202 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > It is convenient for users to have a metric which tells you how long the > restarting of a job has taken. > I propose a to introduce a {{Gauge}} which returns the time between > {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not > restarted (initial run), then this metric will return {{-1}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration
[ https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388507#comment-15388507 ] ASF GitHub Bot commented on FLINK-4202: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2271 Also, the documentation (list of all exposed metrics) was not updated. > Add JM metric which shows the restart duration > -- > > Key: FLINK-4202 > URL: https://issues.apache.org/jira/browse/FLINK-4202 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > It is convenient for users to have a metric which tells you how long the > restarting of a job has taken. > I propose a to introduce a {{Gauge}} which returns the time between > {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not > restarted (initial run), then this metric will return {{-1}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4245) Metric naming improvements
[ https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387967#comment-15387967 ] Chesnay Schepler edited comment on FLINK-4245 at 7/21/16 9:52 PM: -- 1. As long as we don't have unique operator names you cannot have collision free operator metrics. Period. I am getting really tired of explaining this. 2. If you only want to change the naming for JMX I suggest to change the tile to "JMX naming improvements". 3. Your suggestion regarding the domain goes against JMX best practices. They should always start with "org.apache.flink". 4. Please provide a reasoning as to the domain changes. 5. Please provide a comparison as to how a operator and task metric would differ, regarding their domain, tags and ObjectName, based on the current respective default scope format. 6. In general, using what at one point were called "categories" as keys isn't a bad idea. Note however that this becomes inconsistent with user-defined groups, which is the reason we currently only use auto-generated keys. 7. Please provide the use-case regarding [~mdaxini]; i am curious as to what these changes are supposed to allow. was (Author: zentol): 1. As long as we don't have unique operator names you cannot have collision free operator metrics. Period. I am getting really tired of explaining this. 2. If you only want to change the naming for JMX I suggest to change the tile to "JMX naming improvements". 3. Your suggestion regarding the domain goes against JMX best practices. They should always start with "org.apache.flink". 4. Please provide a reasoning as to the domain changes. 5. Please provide a comparison as to how a operator and task metric would differ, (domain and tags) that include all tags that are defined in the current default scope formats. 6. In general, using what at one point were called "categories" as keys isn't a bad idea. Note however that this becomes inconsistent with user-defined groups, which is the reason we currently only use auto-generated keys. 7. Please provide the use-case regarding [~mdaxini]; i am curious as to what these changes are supposed to allow. > Metric naming improvements > -- > > Key: FLINK-4245 > URL: https://issues.apache.org/jira/browse/FLINK-4245 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Stephan Ewen > > A metric currently has two parts to it: > - The name of that particular metric > - The "scope" (or namespace), defined by the group that contains the metric. > A metric group actually always implicitly has a map of naming "tags", like: > - taskmanager_host : > - taskmanager_id : > - task_name : "map() -> filter()" > We derive the scope from that map, following the defined scope formats. > For JMX (and some users that use JMX), it would be natural to expose that map > of tags. Some users reconstruct that map by parsing the metric scope. JMX, we > can expose a metric like: > - domain: "taskmanager.task.operator.io" > - name: "numRecordsIn" > - tags: { "hostname" -> "localhost", "operator_name" -> "map() at > X.java:123", ... } > For many other reporters, the formatted scope makes a lot of sense, since > they think only in terms of (scope, metric-name). > We may even have the formatted scope in JMX as well (in the domain), if we > want to go that route. > [~jgrier] and [~Zentol] - what do you think about that? > [~mdaxini] Does that match your use of the metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4245) Metric naming improvements
[ https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-4245: Component/s: Metrics > Metric naming improvements > -- > > Key: FLINK-4245 > URL: https://issues.apache.org/jira/browse/FLINK-4245 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Stephan Ewen > > A metric currently has two parts to it: > - The name of that particular metric > - The "scope" (or namespace), defined by the group that contains the metric. > A metric group actually always implicitly has a map of naming "tags", like: > - taskmanager_host : > - taskmanager_id : > - task_name : "map() -> filter()" > We derive the scope from that map, following the defined scope formats. > For JMX (and some users that use JMX), it would be natural to expose that map > of tags. Some users reconstruct that map by parsing the metric scope. JMX, we > can expose a metric like: > - domain: "taskmanager.task.operator.io" > - name: "numRecordsIn" > - tags: { "hostname" -> "localhost", "operator_name" -> "map() at > X.java:123", ... } > For many other reporters, the formatted scope makes a lot of sense, since > they think only in terms of (scope, metric-name). > We may even have the formatted scope in JMX as well (in the domain), if we > want to go that route. > [~jgrier] and [~Zentol] - what do you think about that? > [~mdaxini] Does that match your use of the metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2281: RMQ Sink: Possibility to customize queue config [F...
GitHub user PhilippGrulich opened a pull request: https://github.com/apache/flink/pull/2281 RMQ Sink: Possibility to customize queue config [FLINK-4251] This patch adds the possibilty for the user of the RabbitMQ Streaming Sink to customize the queue which is used. This adopts the behavior of [FLINK-4025] for the sink. The commit doesn't change the actual behaviour but makes it possible for users to override the `setupQueue` method and customize their implementation. This was only possible for the RMQSource before. The Sink and the Source offer now both the same functionality, so this should increase usability. [FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4251 Best, Philipp You can merge this pull request into a Git repository by running: $ git pull https://github.com/PhilippGrulich/flink RMQSink_setupQueue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2281 commit 3b5ec690c6320840cc0b9f5eafb64e11761e543b Author: philippgrulichDate: 2016-07-21T20:31:24Z RMQ Sink: Possibility to customize queue config [FLINK-4251] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4251) Add possiblity for the RMQ Streaming Sink to customize the queue
[ https://issues.apache.org/jira/browse/FLINK-4251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388395#comment-15388395 ] ASF GitHub Bot commented on FLINK-4251: --- GitHub user PhilippGrulich opened a pull request: https://github.com/apache/flink/pull/2281 RMQ Sink: Possibility to customize queue config [FLINK-4251] This patch adds the possibilty for the user of the RabbitMQ Streaming Sink to customize the queue which is used. This adopts the behavior of [FLINK-4025] for the sink. The commit doesn't change the actual behaviour but makes it possible for users to override the `setupQueue` method and customize their implementation. This was only possible for the RMQSource before. The Sink and the Source offer now both the same functionality, so this should increase usability. [FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4251 Best, Philipp You can merge this pull request into a Git repository by running: $ git pull https://github.com/PhilippGrulich/flink RMQSink_setupQueue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2281 commit 3b5ec690c6320840cc0b9f5eafb64e11761e543b Author: philippgrulichDate: 2016-07-21T20:31:24Z RMQ Sink: Possibility to customize queue config [FLINK-4251] > Add possiblity for the RMQ Streaming Sink to customize the queue > > > Key: FLINK-4251 > URL: https://issues.apache.org/jira/browse/FLINK-4251 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Philipp Grulich >Priority: Minor > > This patch adds the possibilty for the user of the RabbitMQ > Streaming Sink to customize the queue which is used. > This adopts the behavior of [FLINK-4025] for the sink. > The commit doesn't change the actual behaviour but makes it > possible for users to override the `setupQueue` > method and customize their implementation. This was only possible for the > RMQSource before. The Sink and the Source offer now both the same > functionality, so this should increase usability. > [FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4025 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4251) Add possiblity for the RMQ Streaming Sink to customize the queue
Philipp Grulich created FLINK-4251: -- Summary: Add possiblity for the RMQ Streaming Sink to customize the queue Key: FLINK-4251 URL: https://issues.apache.org/jira/browse/FLINK-4251 Project: Flink Issue Type: Improvement Components: Streaming Connectors Reporter: Philipp Grulich Priority: Minor This patch adds the possibilty for the user of the RabbitMQ Streaming Sink to customize the queue which is used. This adopts the behavior of [FLINK-4025] for the sink. The commit doesn't change the actual behaviour but makes it possible for users to override the `setupQueue` method and customize their implementation. This was only possible for the RMQSource before. The Sink and the Source offer now both the same functionality, so this should increase usability. [FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4025 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3779) Add support for queryable state
[ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388348#comment-15388348 ] ASF GitHub Bot commented on FLINK-3779: --- Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 One more question, is it possible to configure the JobManager Actor path that the client connects to, it looks like it default to `akka://flink/user/jobmanager`. In that way I can create a much more generic client. Note: I know this is initial version, just curious if this is already implemented. :) > Add support for queryable state > --- > > Key: FLINK-3779 > URL: https://issues.apache.org/jira/browse/FLINK-3779 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Flink offers state abstractions for user functions in order to guarantee > fault-tolerant processing of streams. Users can work with both > non-partitioned (Checkpointed interface) and partitioned state > (getRuntimeContext().getState(ValueStateDescriptor) and other variants). > The partitioned state interface provides access to different types of state > that are all scoped to the key of the current input element. This type of > state can only be used on a KeyedStream, which is created via stream.keyBy(). > Currently, all of this state is internal to Flink and used in order to > provide processing guarantees in failure cases (e.g. exactly-once processing). > The goal of Queryable State is to expose this state outside of Flink by > supporting queries against the partitioned key value state. > This will help to eliminate the need for distributed operations/transactions > with external systems such as key-value stores which are often the bottleneck > in practice. Exposing the local state to the outside moves a good part of the > database work into the stream processor, allowing both high throughput > queries and immediate access to the computed state. > This is the initial design doc for the feature: > https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. > Feel free to comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 One more question, is it possible to configure the JobManager Actor path that the client connects to, it looks like it default to `akka://flink/user/jobmanager`. In that way I can create a much more generic client. Note: I know this is initial version, just curious if this is already implemented. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3779) Add support for queryable state
[ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388178#comment-15388178 ] ASF GitHub Bot commented on FLINK-3779: --- Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the issue. > Add support for queryable state > --- > > Key: FLINK-3779 > URL: https://issues.apache.org/jira/browse/FLINK-3779 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Flink offers state abstractions for user functions in order to guarantee > fault-tolerant processing of streams. Users can work with both > non-partitioned (Checkpointed interface) and partitioned state > (getRuntimeContext().getState(ValueStateDescriptor) and other variants). > The partitioned state interface provides access to different types of state > that are all scoped to the key of the current input element. This type of > state can only be used on a KeyedStream, which is created via stream.keyBy(). > Currently, all of this state is internal to Flink and used in order to > provide processing guarantees in failure cases (e.g. exactly-once processing). > The goal of Queryable State is to expose this state outside of Flink by > supporting queries against the partitioned key value state. > This will help to eliminate the need for distributed operations/transactions > with external systems such as key-value stores which are often the bottleneck > in practice. Exposing the local state to the outside moves a good part of the > database work into the stream processor, allowing both high throughput > queries and immediate access to the computed state. > This is the initial design doc for the feature: > https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. > Feel free to comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4250) Cannot select other than first column from Table
[ https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388165#comment-15388165 ] Timo Walther commented on FLINK-4250: - I think the problem of your first exception is that "user" or "USER" is a reserved SQL function returning the current user. The second exception is definitely a bug. > Cannot select other than first column from Table > > > Key: FLINK-4250 > URL: https://issues.apache.org/jira/browse/FLINK-4250 > Project: Flink > Issue Type: Bug > Components: Scala API, Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > > Using the Scala Table API and the {{CsvTableSource}} I cannot select a column > from the csv source. The following code: > {code} > package com.dataartisans.batch > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.api.table.sources.CsvTableSource > import org.apache.flink.api.table.{Row, TableEnvironment, Table} > object CsvTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val csvFilePath = "table-jobs/src/main/resources/input.csv" > val tblEnv = TableEnvironment.getTableEnvironment(env) > val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", > "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO)) > tblEnv.registerTableSource("foobar", csvTS) > val input = tblEnv.sql("SELECT user FROM foobar") > tblEnv.toDataSet[Row](input).print() > } > } > {code} > fails with > {code} > Exception in thread "main" > org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) > at > org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) > at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3779) Add support for queryable state
[ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388116#comment-15388116 ] ASF GitHub Bot commented on FLINK-3779: --- Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Thanks Ufuk & Stephen for the reply, I tried the serializers suggested by you ``` val typeHint = new TypeHint[Tuple2[Long,String]](){} val serializer = TypeInformation.of(typeHint).createSerializer(null) //also tried this val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, LongSerializer.INSTANCE) val serializer2 = new TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String, Long]]], fieldSerializers) ``` But both gives me compilation error at ``` val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( key, serializer2, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE) ``` the compilation error is: ``` Error:(43, 7) type mismatch; found : org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]] required: org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable] Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: java.io.Serializable, but Java-defined class TypeSerializer is invariant in type T. You may wish to investigate a wildcard type such as `_ <: java.io.Serializable`. (SLS 3.2.10) serializer, ^ ``` I had seen this before when I tried to set the serializer from `queryableState.getKeySerializer` Note : It works fine when I use the longer version of serializer that I created. > Add support for queryable state > --- > > Key: FLINK-3779 > URL: https://issues.apache.org/jira/browse/FLINK-3779 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Flink offers state abstractions for user functions in order to guarantee > fault-tolerant processing of streams. Users can work with both > non-partitioned (Checkpointed interface) and partitioned state > (getRuntimeContext().getState(ValueStateDescriptor) and other variants). > The partitioned state interface provides access to different types of state > that are all scoped to the key of the current input element. This type of > state can only be used on a KeyedStream, which is created via stream.keyBy(). > Currently, all of this state is internal to Flink and used in order to > provide processing guarantees in failure cases (e.g. exactly-once processing). > The goal of Queryable State is to expose this state outside of Flink by > supporting queries against the partitioned key value state. > This will help to eliminate the need for distributed operations/transactions > with external systems such as key-value stores which are often the bottleneck > in practice. Exposing the local state to the outside moves a good part of the > database work into the stream processor, allowing both high throughput > queries and immediate access to the computed state. > This is the initial design doc for the feature: > https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. > Feel free to comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Thanks Ufuk & Stephen for the reply, I tried the serializers suggested by you ``` val typeHint = new TypeHint[Tuple2[Long,String]](){} val serializer = TypeInformation.of(typeHint).createSerializer(null) //also tried this val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, LongSerializer.INSTANCE) val serializer2 = new TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String, Long]]], fieldSerializers) ``` But both gives me compilation error at ``` val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( key, serializer2, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE) ``` the compilation error is: ``` Error:(43, 7) type mismatch; found : org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]] required: org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable] Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: java.io.Serializable, but Java-defined class TypeSerializer is invariant in type T. You may wish to investigate a wildcard type such as `_ <: java.io.Serializable`. (SLS 3.2.10) serializer, ^ ``` I had seen this before when I tried to set the serializer from `queryableState.getKeySerializer` Note : It works fine when I use the longer version of serializer that I created. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388102#comment-15388102 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 Adding some more cotext to the implementation details. which is based on the design proposal (https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing) Current security implementation works in a subtle way utilizing the Keberos cache of the user who starts Flink process/jobs and only in the context of supporting secure access to Hadoop cluster. The underlying UGI implementation of Hadoop infrastructure is used to harden the security using the keytab cache. For Yarn mode of deployment, delegation tokens are created and populated to container environment (App Master/JM and TM). There are two areas of improvement that current implementation lacks: 1) Tokens will be expired in due course and hence it impacts long running jobs 2) Missing functionality to support secure connection to Kafka and ZK (Kafka 0.9 and latest ZK versions are supporting kerberos based authentication using SASL/JAAS) This PR addresses above gaps by providing Keytab support to securely communicate to Hadoop and Kafka/ZK services. 1) Additional Configurations: Below new security specific configurations are added to the Flink configuration file. a) security.principal - user principal that Flink process/connectors should authenticate as b) security.keytab - keytab file location In standlone mode, it is assumed that the configurations pre-exists (manual process) on all cluster nodes from where the JM and TMs will be running. In Yarn mode, the configuration (and keytab file) is expected only on the node from where YarnCLI or FlinkCLI will be invoked. Application code takes care of copying Keytab file to JM/TM Yarn containers as local resource for lookup. In the absence of providing security configurations, the delegation token mechanism still works to support backward compatibility (manual kinit before starting JM/TMs). 2) Process-wide in-memory JAAS configuration to enable Kafka/ZK secure authentication. The JAAS configuration plays a critical role in authentication for Kerberized application. Kafka/ZK login module code is expected to construct a login context based on supplied JAAS configuration file entries and authenticates to produce a subject. The context is constructed with an application name which acts as a lookup key into the configuration, yielding one or more login modules. The login module implements the specific strategy, such as using a configured keytab or using the user’s ticket cache. Instead of managing per-connector JAAS configuration file, a process-wide JAAS configuration object is initialized during Flink bootstrap phase, thus providing a singular login module to all callers configured to login using the supplied keytab. (https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html#setConfiguration(javax.security.auth.login.Configuration) To summarize, following sequence happens when the secure configuration is enabled. Flink bootstrap code (both Yarn and Standalone) initializes security context by a) Initializing UGI with the supplied keytab and principal which takes care of handling Kerberos authentication and login renewal for Hadoop services. b) Creating process-wide JAAS configuration object for Kafka/ZK login modules to support Kerberos/SASL authentication. Login renewals are automatically taken care by ZK and Kafka login module implementation. Some additional details are provided in the documentation page as well that can be referenced from here. (https://github.com/vijikarthi/flink/blob/FLINK-3929/docs/internals/flink_security.md) > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 Adding some more cotext to the implementation details. which is based on the design proposal (https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing) Current security implementation works in a subtle way utilizing the Keberos cache of the user who starts Flink process/jobs and only in the context of supporting secure access to Hadoop cluster. The underlying UGI implementation of Hadoop infrastructure is used to harden the security using the keytab cache. For Yarn mode of deployment, delegation tokens are created and populated to container environment (App Master/JM and TM). There are two areas of improvement that current implementation lacks: 1) Tokens will be expired in due course and hence it impacts long running jobs 2) Missing functionality to support secure connection to Kafka and ZK (Kafka 0.9 and latest ZK versions are supporting kerberos based authentication using SASL/JAAS) This PR addresses above gaps by providing Keytab support to securely communicate to Hadoop and Kafka/ZK services. 1) Additional Configurations: Below new security specific configurations are added to the Flink configuration file. a) security.principal - user principal that Flink process/connectors should authenticate as b) security.keytab - keytab file location In standlone mode, it is assumed that the configurations pre-exists (manual process) on all cluster nodes from where the JM and TMs will be running. In Yarn mode, the configuration (and keytab file) is expected only on the node from where YarnCLI or FlinkCLI will be invoked. Application code takes care of copying Keytab file to JM/TM Yarn containers as local resource for lookup. In the absence of providing security configurations, the delegation token mechanism still works to support backward compatibility (manual kinit before starting JM/TMs). 2) Process-wide in-memory JAAS configuration to enable Kafka/ZK secure authentication. The JAAS configuration plays a critical role in authentication for Kerberized application. Kafka/ZK login module code is expected to construct a login context based on supplied JAAS configuration file entries and authenticates to produce a subject. The context is constructed with an application name which acts as a lookup key into the configuration, yielding one or more login modules. The login module implements the specific strategy, such as using a configured keytab or using the userâs ticket cache. Instead of managing per-connector JAAS configuration file, a process-wide JAAS configuration object is initialized during Flink bootstrap phase, thus providing a singular login module to all callers configured to login using the supplied keytab. (https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html#setConfiguration(javax.security.auth.login.Configuration) To summarize, following sequence happens when the secure configuration is enabled. Flink bootstrap code (both Yarn and Standalone) initializes security context by a) Initializing UGI with the supplied keytab and principal which takes care of handling Kerberos authentication and login renewal for Hadoop services. b) Creating process-wide JAAS configuration object for Kafka/ZK login modules to support Kerberos/SASL authentication. Login renewals are automatically taken care by ZK and Kafka login module implementation. Some additional details are provided in the documentation page as well that can be referenced from here. (https://github.com/vijikarthi/flink/blob/FLINK-3929/docs/internals/flink_security.md) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4250) Cannot select other than first column from Table
[ https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-4250: - Priority: Critical (was: Major) > Cannot select other than first column from Table > > > Key: FLINK-4250 > URL: https://issues.apache.org/jira/browse/FLINK-4250 > Project: Flink > Issue Type: Bug > Components: Scala API, Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > > Using the Scala Table API and the {{CsvTableSource}} I cannot select a column > from the csv source. The following code: > {code} > package com.dataartisans.batch > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.api.table.sources.CsvTableSource > import org.apache.flink.api.table.{Row, TableEnvironment, Table} > object CsvTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val csvFilePath = "table-jobs/src/main/resources/input.csv" > val tblEnv = TableEnvironment.getTableEnvironment(env) > val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", > "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO)) > tblEnv.registerTableSource("foobar", csvTS) > val input = tblEnv.sql("SELECT user FROM foobar") > tblEnv.toDataSet[Row](input).print() > } > } > {code} > fails with > {code} > Exception in thread "main" > org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) > at > org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) > at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4250) Cannot select other than first column from Table
[ https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-4250: - Summary: Cannot select other than first column from Table (was: Cannot select column from CsvTableSource) > Cannot select other than first column from Table > > > Key: FLINK-4250 > URL: https://issues.apache.org/jira/browse/FLINK-4250 > Project: Flink > Issue Type: Bug > Components: Scala API, Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann > > Using the Scala Table API and the {{CsvTableSource}} I cannot select a column > from the csv source. The following code: > {code} > package com.dataartisans.batch > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.api.table.sources.CsvTableSource > import org.apache.flink.api.table.{Row, TableEnvironment, Table} > object CsvTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val csvFilePath = "table-jobs/src/main/resources/input.csv" > val tblEnv = TableEnvironment.getTableEnvironment(env) > val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", > "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO)) > tblEnv.registerTableSource("foobar", csvTS) > val input = tblEnv.sql("SELECT user FROM foobar") > tblEnv.toDataSet[Row](input).print() > } > } > {code} > fails with > {code} > Exception in thread "main" > org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) > at > org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) > at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2051 A simpler way to get the serializer may be ```java TypeInformation.of(new TypeHint>(){}).createSerializer(null); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4250) Cannot select column from CsvTableSource
[ https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388074#comment-15388074 ] Till Rohrmann commented on FLINK-4250: -- The problem seems to be more general than the {{CsvTableSource}}. The following code also fails: {code} def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val csvFilePath = "table-jobs/src/main/resources/input.csv" val tblEnv = TableEnvironment.getTableEnvironment(env) val inputDS = env.fromElements((1, "foo", 1.0, "12")) tblEnv.registerDataSet("foobar", inputDS, 'key, 'user, 'value, 'timestamp) val input = tblEnv.sql("SELECT value FROM foobar") tblEnv.toDataSet[Row](input).print() } {code} But this time it fails with {code} Exception in thread "main" org.apache.calcite.sql.parser.SqlParseException: Encountered "value" at line 1, column 8. Was expecting one of: ... {code} > Cannot select column from CsvTableSource > > > Key: FLINK-4250 > URL: https://issues.apache.org/jira/browse/FLINK-4250 > Project: Flink > Issue Type: Bug > Components: Scala API, Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann > > Using the Scala Table API and the {{CsvTableSource}} I cannot select a column > from the csv source. The following code: > {code} > package com.dataartisans.batch > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.api.table.sources.CsvTableSource > import org.apache.flink.api.table.{Row, TableEnvironment, Table} > object CsvTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val csvFilePath = "table-jobs/src/main/resources/input.csv" > val tblEnv = TableEnvironment.getTableEnvironment(env) > val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", > "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO)) > tblEnv.registerTableSource("foobar", csvTS) > val input = tblEnv.sql("SELECT user FROM foobar") > tblEnv.toDataSet[Row](input).print() > } > } > {code} > fails with > {code} > Exception in thread "main" > org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) > at > org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) > at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Created] (FLINK-4250) Cannot select column from CsvTableSource
Till Rohrmann created FLINK-4250: Summary: Cannot select column from CsvTableSource Key: FLINK-4250 URL: https://issues.apache.org/jira/browse/FLINK-4250 Project: Flink Issue Type: Bug Components: Scala API, Table API & SQL Affects Versions: 1.1.0 Reporter: Till Rohrmann Using the Scala Table API and the {{CsvTableSource}} I cannot select a column from the csv source. The following code: {code} package com.dataartisans.batch import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} import org.apache.flink.api.scala._ import org.apache.flink.api.table.sources.CsvTableSource import org.apache.flink.api.table.{Row, TableEnvironment, Table} object CsvTableAPIJob { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val csvFilePath = "table-jobs/src/main/resources/input.csv" val tblEnv = TableEnvironment.getTableEnvironment(env) val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) tblEnv.registerTableSource("foobar", csvTS) val input = tblEnv.sql("SELECT user FROM foobar") tblEnv.toDataSet[Row](input).print() } } {code} fails with {code} Exception in thread "main" org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) at scala.Option.getOrElse(Option.scala:120) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) at org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) at org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3779) Add support for queryable state
[ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388069#comment-15388069 ] ASF GitHub Bot commented on FLINK-3779: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2051 A simpler way to get the serializer may be ```java TypeInformation.of(new TypeHint>(){}).createSerializer(null); ``` > Add support for queryable state > --- > > Key: FLINK-3779 > URL: https://issues.apache.org/jira/browse/FLINK-3779 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Flink offers state abstractions for user functions in order to guarantee > fault-tolerant processing of streams. Users can work with both > non-partitioned (Checkpointed interface) and partitioned state > (getRuntimeContext().getState(ValueStateDescriptor) and other variants). > The partitioned state interface provides access to different types of state > that are all scoped to the key of the current input element. This type of > state can only be used on a KeyedStream, which is created via stream.keyBy(). > Currently, all of this state is internal to Flink and used in order to > provide processing guarantees in failure cases (e.g. exactly-once processing). > The goal of Queryable State is to expose this state outside of Flink by > supporting queries against the partitioned key value state. > This will help to eliminate the need for distributed operations/transactions > with external systems such as key-value stores which are often the bottleneck > in practice. Exposing the local state to the outside moves a good part of the > database work into the stream processor, allowing both high throughput > queries and immediate access to the computed state. > This is the initial design doc for the feature: > https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. > Feel free to comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3779) Add support for queryable state
[ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388058#comment-15388058 ] ASF GitHub Bot commented on FLINK-3779: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2051 Regarding local vs. cluster mode: that's on purpose, but we can certainly change that behaviour. For now, you would have to run in cluster mode. Regarding the serializer: assuming that it is a Flink `Tuple2` you can use the following to get the serializer: ```java TypeSerializer[] fieldSerializers = new TypeSerializer[] { StringSerializer.INSTANCE, LongSerializer.INSTANCE }; TypeSerializer > serializer = new TupleSerializer<>( (Class >) (Class) Tuple2.class, fieldSerializers); ``` **Just to make sure that we are on the same page: the state of this PR is not the final queryable state API, but only the initial low-level version.** Really looking forward to further feedback. Thank you for trying it out at this stage. :-) > Add support for queryable state > --- > > Key: FLINK-3779 > URL: https://issues.apache.org/jira/browse/FLINK-3779 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Flink offers state abstractions for user functions in order to guarantee > fault-tolerant processing of streams. Users can work with both > non-partitioned (Checkpointed interface) and partitioned state > (getRuntimeContext().getState(ValueStateDescriptor) and other variants). > The partitioned state interface provides access to different types of state > that are all scoped to the key of the current input element. This type of > state can only be used on a KeyedStream, which is created via stream.keyBy(). > Currently, all of this state is internal to Flink and used in order to > provide processing guarantees in failure cases (e.g. exactly-once processing). > The goal of Queryable State is to expose this state outside of Flink by > supporting queries against the partitioned key value state. > This will help to eliminate the need for distributed operations/transactions > with external systems such as key-value stores which are often the bottleneck > in practice. Exposing the local state to the outside moves a good part of the > database work into the stream processor, allowing both high throughput > queries and immediate access to the computed state. > This is the initial design doc for the feature: > https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. > Feel free to comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user uce commented on the issue: https://github.com/apache/flink/pull/2051 Regarding local vs. cluster mode: that's on purpose, but we can certainly change that behaviour. For now, you would have to run in cluster mode. Regarding the serializer: assuming that it is a Flink `Tuple2` you can use the following to get the serializer: ```java TypeSerializer[] fieldSerializers = new TypeSerializer[] { StringSerializer.INSTANCE, LongSerializer.INSTANCE }; TypeSerializer > serializer = new TupleSerializer<>( (Class >) (Class) Tuple2.class, fieldSerializers); ``` **Just to make sure that we are on the same page: the state of this PR is not the final queryable state API, but only the initial low-level version.** Really looking forward to further feedback. Thank you for trying it out at this stage. :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3779) Add support for queryable state
[ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388043#comment-15388043 ] ASF GitHub Bot commented on FLINK-3779: --- Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Never mind, I was hitting with wrong key, it works now! Cheers. > Add support for queryable state > --- > > Key: FLINK-3779 > URL: https://issues.apache.org/jira/browse/FLINK-3779 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Flink offers state abstractions for user functions in order to guarantee > fault-tolerant processing of streams. Users can work with both > non-partitioned (Checkpointed interface) and partitioned state > (getRuntimeContext().getState(ValueStateDescriptor) and other variants). > The partitioned state interface provides access to different types of state > that are all scoped to the key of the current input element. This type of > state can only be used on a KeyedStream, which is created via stream.keyBy(). > Currently, all of this state is internal to Flink and used in order to > provide processing guarantees in failure cases (e.g. exactly-once processing). > The goal of Queryable State is to expose this state outside of Flink by > supporting queries against the partitioned key value state. > This will help to eliminate the need for distributed operations/transactions > with external systems such as key-value stores which are often the bottleneck > in practice. Exposing the local state to the outside moves a good part of the > database work into the stream processor, allowing both high throughput > queries and immediate access to the computed state. > This is the initial design doc for the feature: > https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. > Feel free to comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Never mind, I was hitting with wrong key, it works now! Cheers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4243) Change "Scope" to "Namespace" for Metrics
[ https://issues.apache.org/jira/browse/FLINK-4243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387982#comment-15387982 ] Chesnay Schepler commented on FLINK-4243: - I'm not really against this, but i would be curious what you base this on. > Change "Scope" to "Namespace" for Metrics > - > > Key: FLINK-4243 > URL: https://issues.apache.org/jira/browse/FLINK-4243 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Stephan Ewen > > It seems that using **"namespace"** for the names (dotted concatenation) of > metric groups is more common than **"scope"**. > I suggest to follow that terminology in Flink. > [~jgrier] and [~Zentol], I would be interested in what you think there. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration
[ https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387978#comment-15387978 ] ASF GitHub Bot commented on FLINK-4202: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2271 The Javadoc for the `RestartingTimeGauge` is not consistent with the implementation. The Javadocs say "The time between the RESTARTING and RUNNING". The implementation says either * the time between RESTARTING and RUNNING * the time since RESTARTING * the time between RESTARTING and FAILED/SOME_OTHER_TERMINAL_STATE > Add JM metric which shows the restart duration > -- > > Key: FLINK-4202 > URL: https://issues.apache.org/jira/browse/FLINK-4202 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > It is convenient for users to have a metric which tells you how long the > restarting of a job has taken. > I propose a to introduce a {{Gauge}} which returns the time between > {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not > restarted (initial run), then this metric will return {{-1}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2271: [FLINK-4202] Add restarting time JM metric
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2271 The Javadoc for the `RestartingTimeGauge` is not consistent with the implementation. The Javadocs say "The time between the RESTARTING and RUNNING". The implementation says either * the time between RESTARTING and RUNNING * the time since RESTARTING * the time between RESTARTING and FAILED/SOME_OTHER_TERMINAL_STATE --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4245) Metric naming improvements
[ https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387967#comment-15387967 ] Chesnay Schepler commented on FLINK-4245: - 1. As long as we don't have unique operator names you cannot have collision free operator metrics. Period. I am getting really tired of explaining this. 2. If you only want to change the naming for JMX I suggest to change the tile to "JMX naming improvements". 3. Your suggestion regarding the domain goes against JMX best practices. They should always start with "org.apache.flink". 4. Please provide a reasoning as to the domain changes. 5. Please provide a comparison as to how a operator and task metric would differ, (domain and tags) that include all tags that are defined in the current default scope formats. 6. In general, using what at one point were called "categories" as keys isn't a bad idea. Note however that this becomes inconsistent with user-defined groups, which is the reason we currently only use auto-generated keys. 7. Please provide the use-case regarding [~mdaxini]; i am curious as to what these changes are supposed to allow. > Metric naming improvements > -- > > Key: FLINK-4245 > URL: https://issues.apache.org/jira/browse/FLINK-4245 > Project: Flink > Issue Type: Improvement >Reporter: Stephan Ewen > > A metric currently has two parts to it: > - The name of that particular metric > - The "scope" (or namespace), defined by the group that contains the metric. > A metric group actually always implicitly has a map of naming "tags", like: > - taskmanager_host : > - taskmanager_id : > - task_name : "map() -> filter()" > We derive the scope from that map, following the defined scope formats. > For JMX (and some users that use JMX), it would be natural to expose that map > of tags. Some users reconstruct that map by parsing the metric scope. JMX, we > can expose a metric like: > - domain: "taskmanager.task.operator.io" > - name: "numRecordsIn" > - tags: { "hostname" -> "localhost", "operator_name" -> "map() at > X.java:123", ... } > For many other reporters, the formatted scope makes a lot of sense, since > they think only in terms of (scope, metric-name). > We may even have the formatted scope in JMX as well (in the domain), if we > want to go that route. > [~jgrier] and [~Zentol] - what do you think about that? > [~mdaxini] Does that match your use of the metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets
[ https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387961#comment-15387961 ] Josh Forman-Gornall commented on FLINK-4190: Thanks! Oh nice, this looks like a better solution for checking for bucket inactivity... For the tests, is there any reason not to fold all of those tests into the new `BucketingSinkTest`? Currently there's 4:(BucketingSinkITCase, BucketingSinkFaultToleranceITCase, BucketingSinkFaultTolerance2ITCase, BucketingSinkMultipleActiveBucketsCase) > Generalise RollingSink to work with arbitrary buckets > - > > Key: FLINK-4190 > URL: https://issues.apache.org/jira/browse/FLINK-4190 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors >Reporter: Josh Forman-Gornall >Assignee: Josh Forman-Gornall >Priority: Minor > > The current RollingSink implementation appears to be intended for writing to > directories that are bucketed by system time (e.g. minutely) and to only be > writing to one file within one bucket at any point in time. When the system > time determines that the current bucket should be changed, the current bucket > and file are closed and a new bucket and file are created. The sink cannot be > used for the more general problem of writing to arbitrary buckets, perhaps > determined by an attribute on the element/tuple being processed. > There are three limitations which prevent the existing sink from being used > for more general problems: > - Only bucketing by the current system time is supported, and not by e.g. an > attribute of the element being processed by the sink. > - Whenever the sink sees a change in the bucket being written to, it flushes > the file and moves on to the new bucket. Therefore the sink cannot have more > than one bucket/file open at a time. Additionally the checkpointing mechanics > only support saving the state of one active bucket and file. > - The sink determines that it should 'close' an active bucket and file when > the bucket path changes. We need another way to determine when a bucket has > become inactive and needs to be closed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3779) Add support for queryable state
[ https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387930#comment-15387930 ] ASF GitHub Bot commented on FLINK-3779: --- Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Hi, Continuing the discussion from the mailing list, I was able to go past the NettyConfig problem once I ran Flink in cluster mode ( I would still like to know if there is a way to run in local mode so that I can avoid running SBT assembly every time ). But now I am stuck at error message "KvState does not hold any state for key/namespace." which I believe is because of my KeySerializer. Since I am running the QueryClient as a separate application, I don't have access to my queryableState to call `queryableState.getKeySerializer` My key is a tuple of (Long,String) and this is the naive serializer that I wrote (which is probably wrong and I have never written a serializer before) ``` class KeySerializer extends TypeSerializerSingleton[(Long,String)]{ private val EMPTY: (Long,String) = (0,"") override def createInstance(): (Long, String) = EMPTY override def getLength: Int = return 2; override def canEqual(o: scala.Any): Boolean = return o.isInstanceOf[(Long,String)] override def copy(t: (Long, String)): (Long, String) = t override def copy(t: (Long, String), t1: (Long, String)): (Long, String) = t override def copy(dataInputView: DataInputView, dataOutputView: DataOutputView): Unit = { dataOutputView.writeLong(dataInputView.readLong()) StringValue.copyString(dataInputView,dataOutputView) } override def serialize(t: (Long, String), dataOutputView: DataOutputView): Unit = { dataOutputView.writeLong(t._1) StringValue.writeString(t._2,dataOutputView) } override def isImmutableType: Boolean = true override def deserialize(dataInputView: DataInputView): (Long, String) = { val l = dataInputView.readLong() val s = StringValue.readString(dataInputView) (l,s) } override def deserialize(t: (Long, String), dataInputView: DataInputView): (Long, String) = deserialize(dataInputView) } ``` Can you tell me what I am doing wrong here? Thanks! > Add support for queryable state > --- > > Key: FLINK-3779 > URL: https://issues.apache.org/jira/browse/FLINK-3779 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Flink offers state abstractions for user functions in order to guarantee > fault-tolerant processing of streams. Users can work with both > non-partitioned (Checkpointed interface) and partitioned state > (getRuntimeContext().getState(ValueStateDescriptor) and other variants). > The partitioned state interface provides access to different types of state > that are all scoped to the key of the current input element. This type of > state can only be used on a KeyedStream, which is created via stream.keyBy(). > Currently, all of this state is internal to Flink and used in order to > provide processing guarantees in failure cases (e.g. exactly-once processing). > The goal of Queryable State is to expose this state outside of Flink by > supporting queries against the partitioned key value state. > This will help to eliminate the need for distributed operations/transactions > with external systems such as key-value stores which are often the bottleneck > in practice. Exposing the local state to the outside moves a good part of the > database work into the stream processor, allowing both high throughput > queries and immediate access to the computed state. > This is the initial design doc for the feature: > https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. > Feel free to comment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Hi, Continuing the discussion from the mailing list, I was able to go past the NettyConfig problem once I ran Flink in cluster mode ( I would still like to know if there is a way to run in local mode so that I can avoid running SBT assembly every time ). But now I am stuck at error message "KvState does not hold any state for key/namespace." which I believe is because of my KeySerializer. Since I am running the QueryClient as a separate application, I don't have access to my queryableState to call `queryableState.getKeySerializer` My key is a tuple of (Long,String) and this is the naive serializer that I wrote (which is probably wrong and I have never written a serializer before) ``` class KeySerializer extends TypeSerializerSingleton[(Long,String)]{ private val EMPTY: (Long,String) = (0,"") override def createInstance(): (Long, String) = EMPTY override def getLength: Int = return 2; override def canEqual(o: scala.Any): Boolean = return o.isInstanceOf[(Long,String)] override def copy(t: (Long, String)): (Long, String) = t override def copy(t: (Long, String), t1: (Long, String)): (Long, String) = t override def copy(dataInputView: DataInputView, dataOutputView: DataOutputView): Unit = { dataOutputView.writeLong(dataInputView.readLong()) StringValue.copyString(dataInputView,dataOutputView) } override def serialize(t: (Long, String), dataOutputView: DataOutputView): Unit = { dataOutputView.writeLong(t._1) StringValue.writeString(t._2,dataOutputView) } override def isImmutableType: Boolean = true override def deserialize(dataInputView: DataInputView): (Long, String) = { val l = dataInputView.readLong() val s = StringValue.readString(dataInputView) (l,s) } override def deserialize(t: (Long, String), dataInputView: DataInputView): (Long, String) = deserialize(dataInputView) } ``` Can you tell me what I am doing wrong here? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4249) CsvTableSource does not support reading SqlTimeTypeInfo types
Till Rohrmann created FLINK-4249: Summary: CsvTableSource does not support reading SqlTimeTypeInfo types Key: FLINK-4249 URL: https://issues.apache.org/jira/browse/FLINK-4249 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Till Rohrmann The Table API's {{CsvTableSource}} does not support to read all Table API supported data types. For example, it is not possible to read {{SqlTimeTypeInfo}} types via the {{CsvTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4248) CsvTableSource does not support reading SqlTimeTypeInfo types
Till Rohrmann created FLINK-4248: Summary: CsvTableSource does not support reading SqlTimeTypeInfo types Key: FLINK-4248 URL: https://issues.apache.org/jira/browse/FLINK-4248 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Till Rohrmann The Table API's {{CsvTableSource}} does not support to read all Table API supported data types. For example, it is not possible to read {{SqlTimeTypeInfo}} types via the {{CsvTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4247) CsvTableSource.getDataSet() expects Java ExecutionEnvironment
Till Rohrmann created FLINK-4247: Summary: CsvTableSource.getDataSet() expects Java ExecutionEnvironment Key: FLINK-4247 URL: https://issues.apache.org/jira/browse/FLINK-4247 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Till Rohrmann Priority: Minor The Table API offers the {{CsvTableSource}} which can be used with the Java and Scala API. However, if used with the Scala API where on has obtained a {{scala.api.ExecutionEnvironment}} there is a problem with the {{CsvTableSource.getDataSet}} method. The method expects a {{java.api.ExecutionEnvironment}} to extract the underlying {{DataSet}}. Additionally it returns a {{java.api.DataSet}} instead of a {{scala.api.DataSet}}. I think we should also offer a Scala API specific CsvTableSource which works with the respective Scala counterparts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/1989 Don't worry @twalthr, it will be nice to have it in the 1.1 but I understand that it's not a priority :) Thanks a lot for the update! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387913#comment-15387913 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/1989 Don't worry @twalthr, it will be nice to have it in the 1.1 but I understand that it's not a priority :) Thanks a lot for the update! > Create a RowCsvInputFormat to use as default CSV IF in Table API > > > Key: FLINK-3901 > URL: https://issues.apache.org/jira/browse/FLINK-3901 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier >Priority: Minor > Labels: csv, null-values, row, tuple > > At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that > has the big limitation of 25 fields and null handling. > A new IF producing Row object is indeed necessary to avoid those limitations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4244) Field names for union operator do not have to be equal
[ https://issues.apache.org/jira/browse/FLINK-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387907#comment-15387907 ] ASF GitHub Bot commented on FLINK-4244: --- GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2280 [FLINK-4244] [docs] Field names for union operator do not have to be equal We just merged FLINK-2985 , but not update the document. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink FLINK-4244 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2280 commit a3174fb89c7e2ab3e7bcb4f88c9ab3dbe7d47473 Author: Jark WuDate: 2016-07-21T15:25:44Z [FLINK-4244] [docs] Field names for union operator do not have to be equal > Field names for union operator do not have to be equal > -- > > Key: FLINK-4244 > URL: https://issues.apache.org/jira/browse/FLINK-4244 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Jark Wu >Priority: Trivial > > Flink Table API's documentation says that the schemas of unioned tables have > to be identical (wrt types and names). However, union works also with tables > where the types are identical but not the names: > {code} > val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val inputDS1 = env.fromCollection(input1Seq) > val inputDS2 = env.fromCollection(input2Seq) > val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c) > tblEnv.registerTable("foobar", input1) > val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f) > tblEnv.registerTable("foobar2", input2) > val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM > foobar2") > tblEnv.toDataSet[Row](result).print() > {code} > We should update the documentation accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2280: [FLINK-4244] [docs] Field names for union operator...
GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2280 [FLINK-4244] [docs] Field names for union operator do not have to be equal We just merged FLINK-2985 , but not update the document. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink FLINK-4244 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2280 commit a3174fb89c7e2ab3e7bcb4f88c9ab3dbe7d47473 Author: Jark WuDate: 2016-07-21T15:25:44Z [FLINK-4244] [docs] Field names for union operator do not have to be equal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4244) Field names for union operator do not have to be equal
[ https://issues.apache.org/jira/browse/FLINK-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4244: -- Assignee: Jark Wu > Field names for union operator do not have to be equal > -- > > Key: FLINK-4244 > URL: https://issues.apache.org/jira/browse/FLINK-4244 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Jark Wu >Priority: Trivial > > Flink Table API's documentation says that the schemas of unioned tables have > to be identical (wrt types and names). However, union works also with tables > where the types are identical but not the names: > {code} > val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val inputDS1 = env.fromCollection(input1Seq) > val inputDS2 = env.fromCollection(input2Seq) > val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c) > tblEnv.registerTable("foobar", input1) > val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f) > tblEnv.registerTable("foobar2", input2) > val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM > foobar2") > tblEnv.toDataSet[Row](result).print() > {code} > We should update the documentation accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387892#comment-15387892 ] ASF GitHub Bot commented on FLINK-3901: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/1989 @fpompermaier I started working on this, but I can not guarantee that it will be part of the release. > Create a RowCsvInputFormat to use as default CSV IF in Table API > > > Key: FLINK-3901 > URL: https://issues.apache.org/jira/browse/FLINK-3901 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier >Priority: Minor > Labels: csv, null-values, row, tuple > > At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that > has the big limitation of 25 fields and null handling. > A new IF producing Row object is indeed necessary to avoid those limitations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/1989 @fpompermaier I started working on this, but I can not guarantee that it will be part of the release. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4246) Allow Specifying Multiple Metrics Reporters
[ https://issues.apache.org/jira/browse/FLINK-4246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387867#comment-15387867 ] Stephan Ewen commented on FLINK-4246: - +1, I like this idea > Allow Specifying Multiple Metrics Reporters > --- > > Key: FLINK-4246 > URL: https://issues.apache.org/jira/browse/FLINK-4246 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek > Fix For: 1.1.0 > > > We should allow specifying multiple reporters. A rough sketch of how the > configuration should look like is this: > {code} > metrics.reporters = foo,bar > metrics.reporter.foo.class = JMXReporter.class > metrics.reporter.foo.port = 42-117 > metrics.reporter.bar.class = GangliaReporter.class > metrics.reporter.bar.port = 512 > metrics.reporter.bar.whatever = 2 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4192) Move Metrics API to separate module
[ https://issues.apache.org/jira/browse/FLINK-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387865#comment-15387865 ] ASF GitHub Bot commented on FLINK-4192: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2226 Will merge this, leaving the config as is. > Move Metrics API to separate module > --- > > Key: FLINK-4192 > URL: https://issues.apache.org/jira/browse/FLINK-4192 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > > All metrics code currently resides in flink-core. If a user implements a > reporter and wants a fat jar it will now have to include the entire > flink-core module. > Instead, we could move several interfaces into a separate module. > These interfaces to move include: > * Counter, Gauge, Histogram(Statistics) > * MetricGroup > * MetricReporter, Scheduled, AbstractReporter > In addition a new MetricRegistry interface will be required as well as a > replacement for the Configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2226: [FLINK-4192] - Move Metrics API to separate module
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2226 Will merge this, leaving the config as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4152) TaskManager registration exponential backoff doesn't work
[ https://issues.apache.org/jira/browse/FLINK-4152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387859#comment-15387859 ] ASF GitHub Bot commented on FLINK-4152: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2257 Hi @tillrohrmann! Your changes look good. I would say this is good to merge once we have fixed the issue with the `YarnFlinkResourceManagerTest` on Travis. > TaskManager registration exponential backoff doesn't work > - > > Key: FLINK-4152 > URL: https://issues.apache.org/jira/browse/FLINK-4152 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, TaskManager, YARN Client >Reporter: Robert Metzger >Assignee: Till Rohrmann > Attachments: logs.tgz > > > While testing Flink 1.1 I've found that the TaskManagers are logging many > messages when registering at the JobManager. > This is the log file: > https://gist.github.com/rmetzger/0cebe0419cdef4507b1e8a42e33ef294 > Its logging more than 3000 messages in less than a minute. I don't think that > this is the expected behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4246) Allow Specifying Multiple Metrics Reporters
Aljoscha Krettek created FLINK-4246: --- Summary: Allow Specifying Multiple Metrics Reporters Key: FLINK-4246 URL: https://issues.apache.org/jira/browse/FLINK-4246 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.1.0 Reporter: Aljoscha Krettek Fix For: 1.1.0 We should allow specifying multiple reporters. A rough sketch of how the configuration should look like is this: {code} metrics.reporters = foo,bar metrics.reporter.foo.class = JMXReporter.class metrics.reporter.foo.port = 42-117 metrics.reporter.bar.class = GangliaReporter.class metrics.reporter.bar.port = 512 metrics.reporter.bar.whatever = 2 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4229) Do not start Metrics Reporter by default
[ https://issues.apache.org/jira/browse/FLINK-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387848#comment-15387848 ] ASF GitHub Bot commented on FLINK-4229: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2279#discussion_r71722866 --- Diff: flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java --- @@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception { JMXReporter rep2 = new JMXReporter(); Configuration cfg1 = new Configuration(); - cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "-port 9020-9035"); + cfg1.setString("port", "9020-9035"); --- End diff -- Got it, it is the already parsed sub-config. > Do not start Metrics Reporter by default > > > Key: FLINK-4229 > URL: https://issues.apache.org/jira/browse/FLINK-4229 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.1.0 > > > By default, we start a JMX reported that binds to a port and comes with extra > threads. > We should not start any reported by default to keep the overhead to a minimum. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4244) Field names for union operator do not have to be equal
[ https://issues.apache.org/jira/browse/FLINK-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387851#comment-15387851 ] Jark Wu commented on FLINK-4244: Yes, we just merged [FLINK-2985] , but not update the document. I will update it. [FLINK-2985] Allow different field names for unionAll() in Table API. > Field names for union operator do not have to be equal > -- > > Key: FLINK-4244 > URL: https://issues.apache.org/jira/browse/FLINK-4244 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Trivial > > Flink Table API's documentation says that the schemas of unioned tables have > to be identical (wrt types and names). However, union works also with tables > where the types are identical but not the names: > {code} > val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val inputDS1 = env.fromCollection(input1Seq) > val inputDS2 = env.fromCollection(input2Seq) > val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c) > tblEnv.registerTable("foobar", input1) > val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f) > tblEnv.registerTable("foobar2", input2) > val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM > foobar2") > tblEnv.toDataSet[Row](result).print() > {code} > We should update the documentation accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4229) Do not start Metrics Reporter by default
[ https://issues.apache.org/jira/browse/FLINK-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387849#comment-15387849 ] ASF GitHub Bot commented on FLINK-4229: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2279 Looks good, will merge this... > Do not start Metrics Reporter by default > > > Key: FLINK-4229 > URL: https://issues.apache.org/jira/browse/FLINK-4229 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.1.0 > > > By default, we start a JMX reported that binds to a port and comes with extra > threads. > We should not start any reported by default to keep the overhead to a minimum. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2279: [FLINK-4229] Only start JMX server when port is sp...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2279#discussion_r71722866 --- Diff: flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java --- @@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception { JMXReporter rep2 = new JMXReporter(); Configuration cfg1 = new Configuration(); - cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "-port 9020-9035"); + cfg1.setString("port", "9020-9035"); --- End diff -- Got it, it is the already parsed sub-config. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2279: [FLINK-4229] Only start JMX server when port is specified
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2279 Looks good, will merge this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2276: [FLINK-4201] [runtime] Forward suspend to checkpoint coor...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2276 I think as it is, it is fine. Let's extend the option for HA in a separate issue. We could have: - high-availability: *none* (what is currently *standalone recovery*) - high-availability: *filesystem* (no protection against split brain partitioning) - high-availability: *zookeeper* --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4240) Cannot use expressions in Scala Table API's groupBy method
[ https://issues.apache.org/jira/browse/FLINK-4240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387824#comment-15387824 ] Jark Wu commented on FLINK-4240: It's not a bug I think as we cannot select a column not in Group By. In this case we can rename the grouping field and it will work {code} val tblResult = input1.groupBy('a % 4 as 'd).select('d); {code} > Cannot use expressions in Scala Table API's groupBy method > -- > > Key: FLINK-4240 > URL: https://issues.apache.org/jira/browse/FLINK-4240 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann > > The following code fails even though it should be supported according to the > documentation: > {code} > package com.dataartisans.batch > import org.apache.flink.api.scala._ > import org.apache.flink.api.scala.table._ > import org.apache.flink.api.table.{Row, TableConfig, TableEnvironment} > object ScalaSimpleTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tblConfig = new TableConfig > val tblEnv = TableEnvironment.getTableEnvironment(env, tblConfig) > val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val inputDS1 = env.fromCollection(input1Seq) > val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c) > // fails with org.apache.flink.api.table.ValidationException: cannot > resolve [a] given input [('a % 4)] > val tblResult = input1.groupBy('a % 4).select('a); > val result = tblEnv.toDataSet[Row](tblResult) > result.print() > } > } > {code} > {code} > Exception in thread "main" org.apache.flink.api.table.ValidationException: > cannot resolve [a] given input [('a % 4)] > at > org.apache.flink.api.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143) > at > org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:87) > at > org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:84) > at > org.apache.flink.api.table.trees.TreeNode.postOrderTransform(TreeNode.scala:72) > at > org.apache.flink.api.table.plan.logical.LogicalNode.org$apache$flink$api$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:120) > at > org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:133) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:132) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.flink.api.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137) > at > org.apache.flink.api.table.plan.logical.LogicalNode.validate(LogicalNode.scala:84) > at > org.apache.flink.api.table.plan.logical.Project.validate(operators.scala:57) > at org.apache.flink.api.table.GroupedTable.select(table.scala:631) > at > com.dataartisans.batch.ScalaSimpleTableAPIJob$.main(ScalaSimpleTableAPIJob.scala:26) > at > com.dataartisans.batch.ScalaSimpleTableAPIJob.main(ScalaSimpleTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at >
[jira] [Commented] (FLINK-4229) Do not start Metrics Reporter by default
[ https://issues.apache.org/jira/browse/FLINK-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387820#comment-15387820 ] ASF GitHub Bot commented on FLINK-4229: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2279#discussion_r71720105 --- Diff: flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java --- @@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception { JMXReporter rep2 = new JMXReporter(); Configuration cfg1 = new Configuration(); - cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "-port 9020-9035"); + cfg1.setString("port", "9020-9035"); --- End diff -- The change of only starting a server when the ports are present should not change the config keys, correct? > Do not start Metrics Reporter by default > > > Key: FLINK-4229 > URL: https://issues.apache.org/jira/browse/FLINK-4229 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.1.0 > > > By default, we start a JMX reported that binds to a port and comes with extra > threads. > We should not start any reported by default to keep the overhead to a minimum. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2279: [FLINK-4229] Only start JMX server when port is sp...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2279#discussion_r71720105 --- Diff: flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java --- @@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception { JMXReporter rep2 = new JMXReporter(); Configuration cfg1 = new Configuration(); - cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "-port 9020-9035"); + cfg1.setString("port", "9020-9035"); --- End diff -- The change of only starting a server when the ports are present should not change the config keys, correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4229) Do not start Metrics Reporter by default
[ https://issues.apache.org/jira/browse/FLINK-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387809#comment-15387809 ] ASF GitHub Bot commented on FLINK-4229: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2279 [FLINK-4229] Only start JMX server when port is specified The JMX reporter now only starts an extra server if a port (or port range) was specified. Otherwise, it will just have the default local JMX capabilities. R: @StephanEwen R: @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jmx-no-server Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2279.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2279 commit 89fc415ee8b882ac67dc34be41c98811e35dae98 Author: Aljoscha KrettekDate: 2016-07-21T14:44:06Z [FLINK-4229] Only start JMX server when port is specified > Do not start Metrics Reporter by default > > > Key: FLINK-4229 > URL: https://issues.apache.org/jira/browse/FLINK-4229 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.1.0 > > > By default, we start a JMX reported that binds to a port and comes with extra > threads. > We should not start any reported by default to keep the overhead to a minimum. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2279: [FLINK-4229] Only start JMX server when port is sp...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2279 [FLINK-4229] Only start JMX server when port is specified The JMX reporter now only starts an extra server if a port (or port range) was specified. Otherwise, it will just have the default local JMX capabilities. R: @StephanEwen R: @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jmx-no-server Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2279.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2279 commit 89fc415ee8b882ac67dc34be41c98811e35dae98 Author: Aljoscha KrettekDate: 2016-07-21T14:44:06Z [FLINK-4229] Only start JMX server when port is specified --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted
[ https://issues.apache.org/jira/browse/FLINK-4201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387800#comment-15387800 ] ASF GitHub Bot commented on FLINK-4201: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2276 I think as it is, it is fine. Let's extend the option for HA in a separate issue. We could have: - high-availability: *none* (what is currently *standalone recovery*) - high-availability: *filesystem* (no protection against split brain partitioning) - high-availability: *zookeeper* > Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted > --- > > Key: FLINK-4201 > URL: https://issues.apache.org/jira/browse/FLINK-4201 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Ufuk Celebi >Priority: Blocker > > For example, when shutting down a Yarn session, according to the logs > checkpoints for jobs that did not terminate are deleted. In the shutdown > hook, removeAllCheckpoints is called and removes checkpoints that should > still be kept. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2276: [FLINK-4201] [runtime] Forward suspend to checkpoint coor...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2276 Thanks for taking a look, Stephan. Regarding your question: *Would we interfere with such a setup when removing checkpoints on "suspend" in "standalone" mode?*: Yes, we would interfere, but what you describe is currently **not** possible with Flink (that is no one can run it like that). The problem is that recovery on the master is tightly coupled to ZooKeeper (configured via `recovery.mode: ZOOKEEPER`). I really like your idea and agree that it should be possible to run an HA setup like that. I will open an issue for it. Do you think it's important to fix this for 1.1 already? Regarding the name *standalone*: I fully agree. We have a standalone cluster mode and standalone recovery mode. Our standalone recovery mode (`recovery.mode: STANDALONE`) actually means `NO_RECOVERY`. I think that's what also made you assume that what you describe is possible, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted
[ https://issues.apache.org/jira/browse/FLINK-4201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387789#comment-15387789 ] ASF GitHub Bot commented on FLINK-4201: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2276 Thanks for taking a look, Stephan. Regarding your question: *Would we interfere with such a setup when removing checkpoints on "suspend" in "standalone" mode?*: Yes, we would interfere, but what you describe is currently **not** possible with Flink (that is no one can run it like that). The problem is that recovery on the master is tightly coupled to ZooKeeper (configured via `recovery.mode: ZOOKEEPER`). I really like your idea and agree that it should be possible to run an HA setup like that. I will open an issue for it. Do you think it's important to fix this for 1.1 already? Regarding the name *standalone*: I fully agree. We have a standalone cluster mode and standalone recovery mode. Our standalone recovery mode (`recovery.mode: STANDALONE`) actually means `NO_RECOVERY`. I think that's what also made you assume that what you describe is possible, right? > Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted > --- > > Key: FLINK-4201 > URL: https://issues.apache.org/jira/browse/FLINK-4201 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Ufuk Celebi >Priority: Blocker > > For example, when shutting down a Yarn session, according to the logs > checkpoints for jobs that did not terminate are deleted. In the shutdown > hook, removeAllCheckpoints is called and removes checkpoints that should > still be kept. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets
[ https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387767#comment-15387767 ] ASF GitHub Bot commented on FLINK-4190: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2269 Very good work! I know we discussed before whether to check for inactivity in a different thread or in `invoke()`. There's actually a third option that I'm showcasing in the PR I did against your PR. The StreamTask already has a TimerService that can be set for testing. If we use the appropriate methods in the bucketing sink then we get testability with a settable clock for free. I also added a `BucketSinkTest`. It think it would be good if the `BucketSinkITCase` and `BucketingSinkMultipleActiveBucketsCase` could be folded into this one because having the ITCases means having a lot of overhead and our build is already taking quite long. What do you think? > Generalise RollingSink to work with arbitrary buckets > - > > Key: FLINK-4190 > URL: https://issues.apache.org/jira/browse/FLINK-4190 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors >Reporter: Josh Forman-Gornall >Assignee: Josh Forman-Gornall >Priority: Minor > > The current RollingSink implementation appears to be intended for writing to > directories that are bucketed by system time (e.g. minutely) and to only be > writing to one file within one bucket at any point in time. When the system > time determines that the current bucket should be changed, the current bucket > and file are closed and a new bucket and file are created. The sink cannot be > used for the more general problem of writing to arbitrary buckets, perhaps > determined by an attribute on the element/tuple being processed. > There are three limitations which prevent the existing sink from being used > for more general problems: > - Only bucketing by the current system time is supported, and not by e.g. an > attribute of the element being processed by the sink. > - Whenever the sink sees a change in the bucket being written to, it flushes > the file and moves on to the new bucket. Therefore the sink cannot have more > than one bucket/file open at a time. Additionally the checkpointing mechanics > only support saving the state of one active bucket and file. > - The sink determines that it should 'close' an active bucket and file when > the bucket path changes. We need another way to determine when a bucket has > become inactive and needs to be closed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4245) Metric naming improvements
[ https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387772#comment-15387772 ] Stephan Ewen edited comment on FLINK-4245 at 7/21/16 2:26 PM: -- An added benefit of that would be that, regardless of the defined scope format, JMX names never have collisions (because the tags will always be a unique set, since they include all unique IDs). So, we would never have collision warnings using the JMX reporter. was (Author: stephanewen): An added benefit of that would be that, regardless of the defined scope format, JMX names never have collisions. So, we would never have collision warnings using the JMX reporter. > Metric naming improvements > -- > > Key: FLINK-4245 > URL: https://issues.apache.org/jira/browse/FLINK-4245 > Project: Flink > Issue Type: Improvement >Reporter: Stephan Ewen > > A metric currently has two parts to it: > - The name of that particular metric > - The "scope" (or namespace), defined by the group that contains the metric. > A metric group actually always implicitly has a map of naming "tags", like: > - taskmanager_host : > - taskmanager_id : > - task_name : "map() -> filter()" > We derive the scope from that map, following the defined scope formats. > For JMX (and some users that use JMX), it would be natural to expose that map > of tags. Some users reconstruct that map by parsing the metric scope. JMX, we > can expose a metric like: > - domain: "taskmanager.task.operator.io" > - name: "numRecordsIn" > - tags: { "hostname" -> "localhost", "operator_name" -> "map() at > X.java:123", ... } > For many other reporters, the formatted scope makes a lot of sense, since > they think only in terms of (scope, metric-name). > We may even have the formatted scope in JMX as well (in the domain), if we > want to go that route. > [~jgrier] and [~Zentol] - what do you think about that? > [~mdaxini] Does that match your use of the metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4245) Metric naming improvements
[ https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387772#comment-15387772 ] Stephan Ewen commented on FLINK-4245: - An added benefit of that would be that, regardless of the defined scope format, JMX names never have collisions. So, we would never have collision warnings using the JMX reporter. > Metric naming improvements > -- > > Key: FLINK-4245 > URL: https://issues.apache.org/jira/browse/FLINK-4245 > Project: Flink > Issue Type: Improvement >Reporter: Stephan Ewen > > A metric currently has two parts to it: > - The name of that particular metric > - The "scope" (or namespace), defined by the group that contains the metric. > A metric group actually always implicitly has a map of naming "tags", like: > - taskmanager_host : > - taskmanager_id : > - task_name : "map() -> filter()" > We derive the scope from that map, following the defined scope formats. > For JMX (and some users that use JMX), it would be natural to expose that map > of tags. Some users reconstruct that map by parsing the metric scope. JMX, we > can expose a metric like: > - domain: "taskmanager.task.operator.io" > - name: "numRecordsIn" > - tags: { "hostname" -> "localhost", "operator_name" -> "map() at > X.java:123", ... } > For many other reporters, the formatted scope makes a lot of sense, since > they think only in terms of (scope, metric-name). > We may even have the formatted scope in JMX as well (in the domain), if we > want to go that route. > [~jgrier] and [~Zentol] - what do you think about that? > [~mdaxini] Does that match your use of the metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4245) Metric naming improvements
Stephan Ewen created FLINK-4245: --- Summary: Metric naming improvements Key: FLINK-4245 URL: https://issues.apache.org/jira/browse/FLINK-4245 Project: Flink Issue Type: Improvement Reporter: Stephan Ewen A metric currently has two parts to it: - The name of that particular metric - The "scope" (or namespace), defined by the group that contains the metric. A metric group actually always implicitly has a map of naming "tags", like: - taskmanager_host : - taskmanager_id : - task_name : "map() -> filter()" We derive the scope from that map, following the defined scope formats. For JMX (and some users that use JMX), it would be natural to expose that map of tags. Some users reconstruct that map by parsing the metric scope. JMX, we can expose a metric like: - domain: "taskmanager.task.operator.io" - name: "numRecordsIn" - tags: { "hostname" -> "localhost", "operator_name" -> "map() at X.java:123", ... } For many other reporters, the formatted scope makes a lot of sense, since they think only in terms of (scope, metric-name). We may even have the formatted scope in JMX as well (in the domain), if we want to go that route. [~jgrier] and [~Zentol] - what do you think about that? [~mdaxini] Does that match your use of the metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration
[ https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387768#comment-15387768 ] ASF GitHub Bot commented on FLINK-4202: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2271 > Add JM metric which shows the restart duration > -- > > Key: FLINK-4202 > URL: https://issues.apache.org/jira/browse/FLINK-4202 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > It is convenient for users to have a metric which tells you how long the > restarting of a job has taken. > I propose a to introduce a {{Gauge}} which returns the time between > {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not > restarted (initial run), then this metric will return {{-1}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2271: [FLINK-4202] Add restarting time JM metric
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2271 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4202) Add JM metric which shows the restart duration
[ https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-4202. Resolution: Done Added via 5dd85e2867a639e10891209dd59be0136a19ecfa > Add JM metric which shows the restart duration > -- > > Key: FLINK-4202 > URL: https://issues.apache.org/jira/browse/FLINK-4202 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > It is convenient for users to have a metric which tells you how long the > restarting of a job has taken. > I propose a to introduce a {{Gauge}} which returns the time between > {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not > restarted (initial run), then this metric will return {{-1}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration
[ https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387754#comment-15387754 ] ASF GitHub Bot commented on FLINK-4202: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2271 Thanks for the review @uce. Will merge this PR. > Add JM metric which shows the restart duration > -- > > Key: FLINK-4202 > URL: https://issues.apache.org/jira/browse/FLINK-4202 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > It is convenient for users to have a metric which tells you how long the > restarting of a job has taken. > I propose a to introduce a {{Gauge}} which returns the time between > {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not > restarted (initial run), then this metric will return {{-1}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2269 Very good work! I know we discussed before whether to check for inactivity in a different thread or in `invoke()`. There's actually a third option that I'm showcasing in the PR I did against your PR. ð The StreamTask already has a TimerService that can be set for testing. If we use the appropriate methods in the bucketing sink then we get testability with a settable clock for free. I also added a `BucketSinkTest`. It think it would be good if the `BucketSinkITCase` and `BucketingSinkMultipleActiveBucketsCase` could be folded into this one because having the ITCases means having a lot of overhead and our build is already taking quite long. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on BlobSe...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2256 Thank you for your review. I've addressed your comment and now parent directories are deleted if empty, resulting in an empty storage folder after regular cleanup. If there are no objections, I would like to merge this later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2271: [FLINK-4202] Add restarting time JM metric
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2271 Thanks for the review @uce. Will merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/19dae21b00cfbf68ee64af80672d25974a3cd346#commitcomment-18339788 Let's leave the remainder as it is for now, at least as long as we do not have the name collisions fully resolved. I am filing a followup issue about JMX and names. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387753#comment-15387753 ] ASF GitHub Bot commented on FLINK-4150: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2256 Thank you for your review. I've addressed your comment and now parent directories are deleted if empty, resulting in an empty storage folder after regular cleanup. If there are no objections, I would like to merge this later today. > Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown > > > Key: FLINK-4150 > URL: https://issues.apache.org/jira/browse/FLINK-4150 > Project: Flink > Issue Type: Bug > Components: Job-Submission >Reporter: Stefan Richter >Assignee: Ufuk Celebi >Priority: Blocker > Fix For: 1.1.0 > > > Submitting a job in Yarn with HA can lead to the following exception: > {code} > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load > user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 > ClassLoader info: URL ClassLoader: > file: > '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' > (invalid JAR: zip file is empty) > Class not resolvable through given classloader. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > {code} > Some job information, including the Blob ids, are stored in Zookeeper. The > actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set > to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the > cluster is shut down, the path for the BlobStore is deleted. When the cluster > is then restarted, recovering jobs cannot restore because it's Blob ids > stored in Zookeeper now point to deleted files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387744#comment-15387744 ] ASF GitHub Bot commented on FLINK-4150: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2256#discussion_r71711908 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java --- @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { } } + /** +* Tests that the persisted job is not removed from the job graph store +* after the postStop method of the JobManager. Furthermore, it checks +* that BLOBs of the JobGraph are recovered properly and cleaned up after +* the job finishes. +*/ + @Test + public void testBlobRecoveryAfterLostJobManager() throws Exception { + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS); + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Configuration flinkConfiguration = new Configuration(); + UUID leaderSessionID = UUID.randomUUID(); + UUID newLeaderSessionID = UUID.randomUUID(); + int slots = 2; + ActorRef archiveRef = null; + ActorRef jobManagerRef = null; + ActorRef taskManagerRef = null; + + String haStoragePath = temporaryFolder.newFolder().toString(); + + flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, haStoragePath); + flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + + try { + MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore(); + TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(); + + archiveRef = system.actorOf(Props.create( + MemoryArchivist.class, + 10), "archive"); + + jobManagerRef = createJobManagerActor( + "jobmanager-0", + flinkConfiguration, + myLeaderElectionService, + mySubmittedJobGraphStore, + 360, + timeout, + jobRecoveryTimeout, archiveRef); + + ActorGateway jobManager = new AkkaActorGateway(jobManagerRef, leaderSessionID); + + taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + flinkConfiguration, + ResourceID.generate(), + system, + "localhost", + Option.apply("taskmanager"), + Option.apply((LeaderRetrievalService) myLeaderRetrievalService), + true, + TestingTaskManager.class); + + ActorGateway tmGateway = new AkkaActorGateway(taskManagerRef, leaderSessionID); + + Future tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft()); + + Await.ready(tmAlive, deadline.timeLeft()); + + JobVertex sourceJobVertex = new JobVertex("Source"); + sourceJobVertex.setInvokableClass(BlockingInvokable.class); + sourceJobVertex.setParallelism(slots); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); + + // Upload fake JAR file to first JobManager + File jarFile = temporaryFolder.newFile(); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile)); + out.close(); + + jobGraph.addJar(new Path(jarFile.toURI())); + JobClient.uploadJarFiles(jobGraph, jobManager, deadline.timeLeft()); + + Future isLeader = jobManager.ask( +
[GitHub] flink pull request #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2256#discussion_r71711908 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java --- @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { } } + /** +* Tests that the persisted job is not removed from the job graph store +* after the postStop method of the JobManager. Furthermore, it checks +* that BLOBs of the JobGraph are recovered properly and cleaned up after +* the job finishes. +*/ + @Test + public void testBlobRecoveryAfterLostJobManager() throws Exception { + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS); + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Configuration flinkConfiguration = new Configuration(); + UUID leaderSessionID = UUID.randomUUID(); + UUID newLeaderSessionID = UUID.randomUUID(); + int slots = 2; + ActorRef archiveRef = null; + ActorRef jobManagerRef = null; + ActorRef taskManagerRef = null; + + String haStoragePath = temporaryFolder.newFolder().toString(); + + flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, haStoragePath); + flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + + try { + MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore(); + TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(); + + archiveRef = system.actorOf(Props.create( + MemoryArchivist.class, + 10), "archive"); + + jobManagerRef = createJobManagerActor( + "jobmanager-0", + flinkConfiguration, + myLeaderElectionService, + mySubmittedJobGraphStore, + 360, + timeout, + jobRecoveryTimeout, archiveRef); + + ActorGateway jobManager = new AkkaActorGateway(jobManagerRef, leaderSessionID); + + taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + flinkConfiguration, + ResourceID.generate(), + system, + "localhost", + Option.apply("taskmanager"), + Option.apply((LeaderRetrievalService) myLeaderRetrievalService), + true, + TestingTaskManager.class); + + ActorGateway tmGateway = new AkkaActorGateway(taskManagerRef, leaderSessionID); + + Future tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft()); + + Await.ready(tmAlive, deadline.timeLeft()); + + JobVertex sourceJobVertex = new JobVertex("Source"); + sourceJobVertex.setInvokableClass(BlockingInvokable.class); + sourceJobVertex.setParallelism(slots); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); + + // Upload fake JAR file to first JobManager + File jarFile = temporaryFolder.newFile(); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile)); + out.close(); + + jobGraph.addJar(new Path(jarFile.toURI())); + JobClient.uploadJarFiles(jobGraph, jobManager, deadline.timeLeft()); + + Future isLeader = jobManager.ask( + TestingJobManagerMessages.getNotifyWhenLeader(), + deadline.timeLeft()); + + Future isConnectedToJobManager = tmGateway.ask( + new
[jira] [Created] (FLINK-4244) Field names for union operator do not have to be equal
Till Rohrmann created FLINK-4244: Summary: Field names for union operator do not have to be equal Key: FLINK-4244 URL: https://issues.apache.org/jira/browse/FLINK-4244 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Till Rohrmann Priority: Trivial Flink Table API's documentation says that the schemas of unioned tables have to be identical (wrt types and names). However, union works also with tables where the types are identical but not the names: {code} val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, x.toDouble)} val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, x.toDouble)} val inputDS1 = env.fromCollection(input1Seq) val inputDS2 = env.fromCollection(input2Seq) val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c) tblEnv.registerTable("foobar", input1) val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f) tblEnv.registerTable("foobar2", input2) val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM foobar2") tblEnv.toDataSet[Row](result).print() {code} We should update the documentation accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics
[ https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-3962. - Resolution: Fixed > JMXReporter doesn't properly register/deregister metrics > > > Key: FLINK-3962 > URL: https://issues.apache.org/jira/browse/FLINK-3962 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Stephan Ewen > Fix For: 1.1.0 > > > The following fails our Yarn tests because it checks for errors in the > jobmanager/taskmanager logs: > {noformat} > 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter > - A metric with the name > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > was already registered. > javax.management.InstanceAlreadyExistsException: > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76) > at > org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144) > at > org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40) > at > org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68) > at > org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74) > at > org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1092) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:441) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:283) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >
[jira] [Created] (FLINK-4243) Change "Scope" to "Namespace" for Metrics
Stephan Ewen created FLINK-4243: --- Summary: Change "Scope" to "Namespace" for Metrics Key: FLINK-4243 URL: https://issues.apache.org/jira/browse/FLINK-4243 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.1.0 Reporter: Stephan Ewen It seems that using **"namespace"** for the names (dotted concatenation) of metric groups is more common than **"scope"**. I suggest to follow that terminology in Flink. [~jgrier] and [~Zentol], I would be interested in what you think there. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted
[ https://issues.apache.org/jira/browse/FLINK-4201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387721#comment-15387721 ] ASF GitHub Bot commented on FLINK-4201: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2276 I think that change is good in "our" philosophy, just wondering about the following implication: Some people want to run HA setups without ZooKeeper, simply using an external service to make sure that the JobManager is restarted. The latest completed checkpoint is found via a well-defined path in the checkpoint storage (rather than a well defined path in ZooKeeper). That works as an HA setup (with the exception of being susceptible to "split brain" behavior in the presence of network partitions. Would we interfere with such a setup when removing checkpoints on "suspend" in "standalone" mode? BTW: We should really find a new name for "standalone" more ;-) The term is overloaded. > Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted > --- > > Key: FLINK-4201 > URL: https://issues.apache.org/jira/browse/FLINK-4201 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Ufuk Celebi >Priority: Blocker > > For example, when shutting down a Yarn session, according to the logs > checkpoints for jobs that did not terminate are deleted. In the shutdown > hook, removeAllCheckpoints is called and removes checkpoints that should > still be kept. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics
[ https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-3962. --- > JMXReporter doesn't properly register/deregister metrics > > > Key: FLINK-3962 > URL: https://issues.apache.org/jira/browse/FLINK-3962 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Stephan Ewen > Fix For: 1.1.0 > > > The following fails our Yarn tests because it checks for errors in the > jobmanager/taskmanager logs: > {noformat} > 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter > - A metric with the name > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > was already registered. > javax.management.InstanceAlreadyExistsException: > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76) > at > org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144) > at > org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40) > at > org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68) > at > org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74) > at > org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1092) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:441) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:283) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at >
[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics
[ https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387726#comment-15387726 ] Stephan Ewen commented on FLINK-3962: - This issue has been resolved by not throwing exceptions, but merely logging a simple warning. > JMXReporter doesn't properly register/deregister metrics > > > Key: FLINK-3962 > URL: https://issues.apache.org/jira/browse/FLINK-3962 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Stephan Ewen > Fix For: 1.1.0 > > > The following fails our Yarn tests because it checks for errors in the > jobmanager/taskmanager logs: > {noformat} > 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter > - A metric with the name > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > was already registered. > javax.management.InstanceAlreadyExistsException: > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76) > at > org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144) > at > org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40) > at > org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68) > at > org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74) > at > org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1092) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:441) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:283) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at
[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration
[ https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387718#comment-15387718 ] ASF GitHub Bot commented on FLINK-4202: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2271 +1 to merge. Tests and and comments are very good. I've tested this locally with a job with restarts and everything works as expected: ![screen shot 2016-07-21 at 15 47 34](https://cloud.githubusercontent.com/assets/1756620/17025085/a38131ae-4f5a-11e6-8260-3d55f2025760.png) > Add JM metric which shows the restart duration > -- > > Key: FLINK-4202 > URL: https://issues.apache.org/jira/browse/FLINK-4202 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > It is convenient for users to have a metric which tells you how long the > restarting of a job has taken. > I propose a to introduce a {{Gauge}} which returns the time between > {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not > restarted (initial run), then this metric will return {{-1}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2276: [FLINK-4201] [runtime] Forward suspend to checkpoint coor...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2276 I think that change is good in "our" philosophy, just wondering about the following implication: Some people want to run HA setups without ZooKeeper, simply using an external service to make sure that the JobManager is restarted. The latest completed checkpoint is found via a well-defined path in the checkpoint storage (rather than a well defined path in ZooKeeper). That works as an HA setup (with the exception of being susceptible to "split brain" behavior in the presence of network partitions. Would we interfere with such a setup when removing checkpoints on "suspend" in "standalone" mode? BTW: We should really find a new name for "standalone" more ;-) The term is overloaded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2271: [FLINK-4202] Add restarting time JM metric
Github user uce commented on the issue: https://github.com/apache/flink/pull/2271 +1 to merge. Tests and and comments are very good. I've tested this locally with a job with restarts and everything works as expected: ![screen shot 2016-07-21 at 15 47 34](https://cloud.githubusercontent.com/assets/1756620/17025085/a38131ae-4f5a-11e6-8260-3d55f2025760.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #:
Github user aljoscha commented on the pull request: https://github.com/apache/flink/commit/19dae21b00cfbf68ee64af80672d25974a3cd346#commitcomment-18339338 That seems like a good idea, yes. Still not have the JMXReporter active by default or now have it active by default again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4242) Improve validation exception messages
Till Rohrmann created FLINK-4242: Summary: Improve validation exception messages Key: FLINK-4242 URL: https://issues.apache.org/jira/browse/FLINK-4242 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Till Rohrmann Priority: Minor The Table API's validation exceptions could be improved to be more meaningful for users. For example, the following code snippet: {code} Table inputTable = tableEnv.fromDataStream(env.fromElements( Tuple3.of(1, "a", 1.0), Tuple3.of(2, "b", 2.0), Tuple3.of(3, "c", 3.0)), "a, b, c"); inputTable.select("a").where("!a"); {code} fails correctly. However, the validation exception message says "Expression !('a) failed on input check: Not only accepts child of Boolean Type, get Integer". I think it could be changed such that it says: "The not operator requires a boolean input but "a" is of type integer." or something similar. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2271: [FLINK-4202] Add restarting time JM metric
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2271#discussion_r71705985 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.AbstractMetricGroup; +import org.apache.flink.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.Matchers; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future$; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ExecutionGraphMetricsTest extends TestLogger { + + /** +* This test tests that the restarting time metric correctly displays restarting times. +*/ + @Test + public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException { --- End diff -- Very good test ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---