[jira] [Commented] (FLINK-4222) Allow Kinesis configuration to get credentials from AWS Metadata

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388967#comment-15388967
 ] 

ASF GitHub Bot commented on FLINK-4222:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2260
  
Hi @chadnickbok,
I've gave the new config a test on AWS EC2 instances with appropriate IAM 
roles attached, and it works as expected and nicely. I think the changes are 
good to merge once the remaining comments are addressed :)


> Allow Kinesis configuration to get credentials from AWS Metadata
> 
>
> Key: FLINK-4222
> URL: https://issues.apache.org/jira/browse/FLINK-4222
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.3
>Reporter: Nick Chadwick
>Priority: Minor
>  Labels: easyfix
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When deploying Flink TaskManagers in an EC2 environment, it would be nice to 
> be able to use the EC2 IAM Role credentials provided by the EC2 Metadata 
> service.
> This allows for credentials to be automatically discovered by services 
> running on EC2 instances at runtime, and removes the need to explicitly 
> create and assign credentials to TaskManagers.
> This should be a fairly small change to the configuration of the 
> flink-connector-kinesis, which will greatly improve the ease of deployment to 
> Amazon EC2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2260: [FLINK-4222] Allow Kinesis configuration to get credentia...

2016-07-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2260
  
Hi @chadnickbok,
I've gave the new config a test on AWS EC2 instances with appropriate IAM 
roles attached, and it works as expected and nicely. I think the changes are 
good to merge once the remaining comments are addressed :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2282: Support order by with offset and fetch.

2016-07-21 Thread gallenvara
GitHub user gallenvara opened a pull request:

https://github.com/apache/flink/pull/2282

Support order by with offset and fetch.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

Support orderBy with offset and fetch.
@twalthr @fhueske  Can you help with review and give me some feedback? :)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gallenvara/flink flink-3940

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2282


commit e6f2da3ef8e566942e160b628521ee0f50049fa4
Author: gallenvara 
Date:   2016-07-22T03:39:46Z

Support order by with offset and fetch.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4252) Table program cannot be compiled

2016-07-21 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated FLINK-4252:
-
Description: 
I'm trying the table apis.
I got some errors like this
My code is in the attachments

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672)
at TestMain$.main(TestMain.scala:31)
at TestMain.main(TestMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: 
Table program cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
at 
org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.api.table.runtime.FunctionCompiler$class.compile(FunctionCompiler.scala:37)
at 
org.apache.flink.api.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:28)
at 
org.apache.flink.api.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:42)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
 

[jira] [Created] (FLINK-4252) Table program cannot be compiled

2016-07-21 Thread Renkai Ge (JIRA)
Renkai Ge created FLINK-4252:


 Summary: Table program cannot be compiled
 Key: FLINK-4252
 URL: https://issues.apache.org/jira/browse/FLINK-4252
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.1.0
 Environment: OS X EI Captain
scala 2.11.7
jdk 8
Reporter: Renkai Ge
 Attachments: TestMain.scala

I'm trying the table apis.
I got some errors like this


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672)
at TestMain$.main(TestMain.scala:31)
at TestMain.main(TestMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: 
Table program cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
at 
org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.api.table.runtime.FunctionCompiler$class.compile(FunctionCompiler.scala:37)
at 

[jira] [Updated] (FLINK-4252) Table program cannot be compiled

2016-07-21 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated FLINK-4252:
-
Attachment: TestMain.scala

> Table program cannot be compiled
> 
>
> Key: FLINK-4252
> URL: https://issues.apache.org/jira/browse/FLINK-4252
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
> Environment: OS X EI Captain
> scala 2.11.7
> jdk 8
>Reporter: Renkai Ge
> Attachments: TestMain.scala
>
>
> I'm trying the table apis.
> I got some errors like this
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672)
>   at TestMain$.main(TestMain.scala:31)
>   at TestMain.main(TestMain.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
> in class org.apache.flink.api.table.runtime.FlatMapRunner caused an 
> exception: Table program cannot be compiled. This is a bug. Please file an 
> issue.
>   at 
> org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
>   at 
> org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: 

[jira] [Comment Edited] (FLINK-4250) Cannot select other than first column from Table

2016-07-21 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388780#comment-15388780
 ] 

Jark Wu edited comment on FLINK-4250 at 7/22/16 2:55 AM:
-

I think the second exception is similar to the first, because "VALUE" is also a 
reserved SQL keyword as a part of "[NEXT VALUE 
FOR|https://msdn.microsoft.com/en-us//library/ff878370.aspx]; which generates a 
sequence.


was (Author: jark):
I think the second exception is similar to the first, because "VALUE" is also a 
reserved SQL keyword as a part of "NEXT VALUE FOR" which generates a sequence.

> Cannot select other than first column from Table
> 
>
> Key: FLINK-4250
> URL: https://issues.apache.org/jira/browse/FLINK-4250
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> Using the Scala Table API and the {{CsvTableSource}} I cannot select a column 
> from the csv source. The following code:
> {code}
> package com.dataartisans.batch
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.api.table.sources.CsvTableSource
> import org.apache.flink.api.table.{Row, TableEnvironment, Table}
> object CsvTableAPIJob {
>   def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val csvFilePath = "table-jobs/src/main/resources/input.csv"
> val tblEnv = TableEnvironment.getTableEnvironment(env)
> val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", 
> "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO))
> tblEnv.registerTableSource("foobar", csvTS)
> val input = tblEnv.sql("SELECT user FROM foobar")
> tblEnv.toDataSet[Row](input).print()
>   }
> }
> {code}
> fails with 
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
>   at 
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
>   at 
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
>   at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
>   at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4250) Cannot select other than first column from Table

2016-07-21 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388780#comment-15388780
 ] 

Jark Wu commented on FLINK-4250:


I think the second exception is similar to the first, because "VALUE" is also a 
reserved SQL keyword as a part of "NEXT VALUE FOR" which generates a sequence.

> Cannot select other than first column from Table
> 
>
> Key: FLINK-4250
> URL: https://issues.apache.org/jira/browse/FLINK-4250
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> Using the Scala Table API and the {{CsvTableSource}} I cannot select a column 
> from the csv source. The following code:
> {code}
> package com.dataartisans.batch
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.api.table.sources.CsvTableSource
> import org.apache.flink.api.table.{Row, TableEnvironment, Table}
> object CsvTableAPIJob {
>   def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val csvFilePath = "table-jobs/src/main/resources/input.csv"
> val tblEnv = TableEnvironment.getTableEnvironment(env)
> val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", 
> "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO))
> tblEnv.registerTableSource("foobar", csvTS)
> val input = tblEnv.sql("SELECT user FROM foobar")
> tblEnv.toDataSet[Row](input).print()
>   }
> }
> {code}
> fails with 
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
>   at 
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
>   at 
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
>   at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
>   at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-07-21 Thread GaoLun (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388752#comment-15388752
 ] 

GaoLun commented on FLINK-3940:
---

Hello, i would like to work on this issue. :)

> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4175) Broadcast data sent increases with # slots per TM

2016-07-21 Thread Felix Neutatz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388545#comment-15388545
 ] 

Felix Neutatz commented on FLINK-4175:
--

I started a design document here: 
https://docs.google.com/document/d/1odYIvmQt4feonQF9q-btBnGvrzzN3lX0Os6rzHcCOjA/edit?usp=sharing

I highly appreciate any idea or comment and I am looking forward to the 
discussion

> Broadcast data sent increases with # slots per TM
> -
>
> Key: FLINK-4175
> URL: https://issues.apache.org/jira/browse/FLINK-4175
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, TaskManager
>Affects Versions: 1.0.3
>Reporter: Felix Neutatz
>Assignee: Felix Neutatz
>  Labels: performance
>
> Problem:
> we experience some unexpected increase of data sent over the network for 
> broadcasts with increasing number of slots per Taskmanager.
> We provided a benchmark [1]. It not only increases the size of data sent over 
> the network but also hurts performance as seen in the preliminary results 
> below. In this results cloud-11 has 25 nodes and ibm-power has 8 nodes with 
> scaling the number of slots per node from 1 - 16.
> +---+--+-+
> | suite | name | median_time |
> +===+==+=+
> | broadcast.cloud-11| broadcast.01 |8796 |
> | broadcast.cloud-11| broadcast.02 |   14802 |
> | broadcast.cloud-11| broadcast.04 |   30173 |
> | broadcast.cloud-11| broadcast.08 |   56936 |
> | broadcast.cloud-11| broadcast.16 |  117507 |
> | broadcast.ibm-power-1 | broadcast.01 |6807 |
> | broadcast.ibm-power-1 | broadcast.02 |8443 |
> | broadcast.ibm-power-1 | broadcast.04 |   11823 |
> | broadcast.ibm-power-1 | broadcast.08 |   21655 |
> | broadcast.ibm-power-1 | broadcast.16 |   37426 |
> +---+--+-+
> After looking into the code base it, it seems that the data is de-serialized 
> only once per TM, but the actual data is sent for all slots running the 
> operator with broadcast vars and just gets discarded in case its already 
> de-serialized.
> We do not see a reason the data can't be shared among the slots of a TM and 
> therefore just sent once.
> [1] https://github.com/TU-Berlin-DIMA/flink-broadcast
> This Jira will continue the discussion started here: 
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3c1465386300767.94...@tu-berlin.de%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2271: [FLINK-4202] Add restarting time JM metric

2016-07-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2271
  
Also, the documentation (list of all exposed metrics) was not updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Reopened] (FLINK-4202) Add JM metric which shows the restart duration

2016-07-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-4202:
-

Documentation was not properly updated.

> Add JM metric which shows the restart duration
> --
>
> Key: FLINK-4202
> URL: https://issues.apache.org/jira/browse/FLINK-4202
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> It is convenient for users to have a metric which tells you how long the 
> restarting of a job has taken.
> I propose a to introduce a {{Gauge}} which returns the time between 
> {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not 
> restarted (initial run), then this metric will return {{-1}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388507#comment-15388507
 ] 

ASF GitHub Bot commented on FLINK-4202:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2271
  
Also, the documentation (list of all exposed metrics) was not updated.


> Add JM metric which shows the restart duration
> --
>
> Key: FLINK-4202
> URL: https://issues.apache.org/jira/browse/FLINK-4202
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> It is convenient for users to have a metric which tells you how long the 
> restarting of a job has taken.
> I propose a to introduce a {{Gauge}} which returns the time between 
> {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not 
> restarted (initial run), then this metric will return {{-1}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4245) Metric naming improvements

2016-07-21 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387967#comment-15387967
 ] 

Chesnay Schepler edited comment on FLINK-4245 at 7/21/16 9:52 PM:
--

1. As long as we don't have unique operator names you cannot have collision 
free operator metrics. Period. I am getting really tired of explaining this.
2. If you only want to change the naming for JMX I suggest to change the tile 
to "JMX naming improvements".
3. Your suggestion regarding the domain goes against JMX best practices. They 
should always start with "org.apache.flink".
4. Please provide a reasoning as to the domain changes.
5. Please provide a comparison as to how a operator and task metric would 
differ, regarding their domain, tags and ObjectName, based on the current 
respective default scope format.
6. In general, using what at one point were called "categories" as keys isn't a 
bad idea. Note however that this becomes inconsistent with user-defined groups, 
which is the reason we currently only use auto-generated keys.
7. Please provide the use-case regarding [~mdaxini]; i am curious as to what 
these changes are supposed to allow.




was (Author: zentol):
1. As long as we don't have unique operator names you cannot have collision 
free operator metrics. Period. I am getting really tired of explaining this.
2. If you only want to change the naming for JMX I suggest to change the tile 
to "JMX naming improvements".
3. Your suggestion regarding the domain goes against JMX best practices. They 
should always start with "org.apache.flink".
4. Please provide a reasoning as to the domain changes.
5. Please provide a comparison as to how a operator and task metric would 
differ, (domain and tags) that include all tags that are defined in the current 
default scope formats.
6. In general, using what at one point were called "categories" as keys isn't a 
bad idea. Note however that this becomes inconsistent with user-defined groups, 
which is the reason we currently only use auto-generated keys.
7. Please provide the use-case regarding [~mdaxini]; i am curious as to what 
these changes are supposed to allow.



> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4245) Metric naming improvements

2016-07-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-4245:

Component/s: Metrics

> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2281: RMQ Sink: Possibility to customize queue config [F...

2016-07-21 Thread PhilippGrulich
GitHub user PhilippGrulich opened a pull request:

https://github.com/apache/flink/pull/2281

RMQ Sink: Possibility to customize queue config [FLINK-4251]

This patch adds the possibilty for the user of the RabbitMQ
Streaming Sink to customize the queue which is used. 
This adopts the behavior of [FLINK-4025] for the sink.
The commit doesn't change the actual behaviour but makes it possible for 
users to override the `setupQueue` method and customize their implementation. 
This was only possible for the RMQSource before. The Sink and the Source offer 
now both the same functionality, so this should increase usability. 

[FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4251

Best,
Philipp

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/PhilippGrulich/flink RMQSink_setupQueue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2281


commit 3b5ec690c6320840cc0b9f5eafb64e11761e543b
Author: philippgrulich 
Date:   2016-07-21T20:31:24Z

RMQ Sink: Possibility to customize queue config [FLINK-4251]




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4251) Add possiblity for the RMQ Streaming Sink to customize the queue

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388395#comment-15388395
 ] 

ASF GitHub Bot commented on FLINK-4251:
---

GitHub user PhilippGrulich opened a pull request:

https://github.com/apache/flink/pull/2281

RMQ Sink: Possibility to customize queue config [FLINK-4251]

This patch adds the possibilty for the user of the RabbitMQ
Streaming Sink to customize the queue which is used. 
This adopts the behavior of [FLINK-4025] for the sink.
The commit doesn't change the actual behaviour but makes it possible for 
users to override the `setupQueue` method and customize their implementation. 
This was only possible for the RMQSource before. The Sink and the Source offer 
now both the same functionality, so this should increase usability. 

[FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4251

Best,
Philipp

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/PhilippGrulich/flink RMQSink_setupQueue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2281


commit 3b5ec690c6320840cc0b9f5eafb64e11761e543b
Author: philippgrulich 
Date:   2016-07-21T20:31:24Z

RMQ Sink: Possibility to customize queue config [FLINK-4251]




> Add possiblity for the RMQ Streaming Sink to customize the queue
> 
>
> Key: FLINK-4251
> URL: https://issues.apache.org/jira/browse/FLINK-4251
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Philipp Grulich
>Priority: Minor
>
> This patch adds the possibilty for the user of the RabbitMQ
> Streaming Sink to customize the queue which is used. 
> This adopts the behavior of [FLINK-4025] for the sink.
> The commit doesn't change the actual behaviour but makes it
> possible for users to override the `setupQueue`
> method and customize their implementation. This was only possible for the 
> RMQSource before. The Sink and the Source offer now both the same 
> functionality, so this should increase usability. 
> [FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4025



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4251) Add possiblity for the RMQ Streaming Sink to customize the queue

2016-07-21 Thread Philipp Grulich (JIRA)
Philipp Grulich created FLINK-4251:
--

 Summary: Add possiblity for the RMQ Streaming Sink to customize 
the queue
 Key: FLINK-4251
 URL: https://issues.apache.org/jira/browse/FLINK-4251
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Reporter: Philipp Grulich
Priority: Minor


This patch adds the possibilty for the user of the RabbitMQ
Streaming Sink to customize the queue which is used. 
This adopts the behavior of [FLINK-4025] for the sink.
The commit doesn't change the actual behaviour but makes it
possible for users to override the `setupQueue`
method and customize their implementation. This was only possible for the 
RMQSource before. The Sink and the Source offer now both the same 
functionality, so this should increase usability. 


[FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4025



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3779) Add support for queryable state

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388348#comment-15388348
 ] 

ASF GitHub Bot commented on FLINK-3779:
---

Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
One more question, is it possible  to configure the JobManager Actor path 
that the client connects to, it looks like it default to 
`akka://flink/user/jobmanager`.
In that way I can create a much more generic client. 

Note: I know this is initial version, just curious if this is already 
implemented. :)




> Add support for queryable state
> ---
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
One more question, is it possible  to configure the JobManager Actor path 
that the client connects to, it looks like it default to 
`akka://flink/user/jobmanager`.
In that way I can create a much more generic client. 

Note: I know this is initial version, just curious if this is already 
implemented. :)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not 
flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the 
issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3779) Add support for queryable state

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388178#comment-15388178
 ] 

ASF GitHub Bot commented on FLINK-3779:
---

Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not 
flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the 
issue.


> Add support for queryable state
> ---
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4250) Cannot select other than first column from Table

2016-07-21 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388165#comment-15388165
 ] 

Timo Walther commented on FLINK-4250:
-

I think the problem of your first exception is that "user" or "USER" is a 
reserved SQL function returning the current user. The second exception is 
definitely a bug.

> Cannot select other than first column from Table
> 
>
> Key: FLINK-4250
> URL: https://issues.apache.org/jira/browse/FLINK-4250
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> Using the Scala Table API and the {{CsvTableSource}} I cannot select a column 
> from the csv source. The following code:
> {code}
> package com.dataartisans.batch
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.api.table.sources.CsvTableSource
> import org.apache.flink.api.table.{Row, TableEnvironment, Table}
> object CsvTableAPIJob {
>   def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val csvFilePath = "table-jobs/src/main/resources/input.csv"
> val tblEnv = TableEnvironment.getTableEnvironment(env)
> val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", 
> "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO))
> tblEnv.registerTableSource("foobar", csvTS)
> val input = tblEnv.sql("SELECT user FROM foobar")
> tblEnv.toDataSet[Row](input).print()
>   }
> }
> {code}
> fails with 
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
>   at 
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
>   at 
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
>   at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
>   at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3779) Add support for queryable state

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388116#comment-15388116
 ] 

ASF GitHub Bot commented on FLINK-3779:
---

Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Thanks Ufuk & Stephen for the reply,

I tried the serializers suggested by you
```
val typeHint = new TypeHint[Tuple2[Long,String]](){}
val serializer = TypeInformation.of(typeHint).createSerializer(null)

//also tried this
val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, 
LongSerializer.INSTANCE)
val serializer2 = new 
TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String,
 Long]]], fieldSerializers)
```

But both gives me compilation error at
```
val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
  key,
  serializer2,
  VoidNamespace.INSTANCE,
  VoidNamespaceSerializer.INSTANCE)
```
the compilation error is:
```
Error:(43, 7) type mismatch;
found   : 
org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]]
 required: 
org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable]
Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: 
java.io.Serializable, but Java-defined class TypeSerializer is invariant in 
type T.
You may wish to investigate a wildcard type such as `_ <: 
java.io.Serializable`. (SLS 3.2.10)
  serializer,
  ^
```

I had seen this before when I tried to set the serializer from 
`queryableState.getKeySerializer` 

Note : It works fine when I use the longer version of serializer that I 
created.


> Add support for queryable state
> ---
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Thanks Ufuk & Stephen for the reply,

I tried the serializers suggested by you
```
val typeHint = new TypeHint[Tuple2[Long,String]](){}
val serializer = TypeInformation.of(typeHint).createSerializer(null)

//also tried this
val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, 
LongSerializer.INSTANCE)
val serializer2 = new 
TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String,
 Long]]], fieldSerializers)
```

But both gives me compilation error at
```
val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
  key,
  serializer2,
  VoidNamespace.INSTANCE,
  VoidNamespaceSerializer.INSTANCE)
```
the compilation error is:
```
Error:(43, 7) type mismatch;
found   : 
org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]]
 required: 
org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable]
Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: 
java.io.Serializable, but Java-defined class TypeSerializer is invariant in 
type T.
You may wish to investigate a wildcard type such as `_ <: 
java.io.Serializable`. (SLS 3.2.10)
  serializer,
  ^
```

I had seen this before when I tried to set the serializer from 
`queryableState.getKeySerializer` 

Note : It works fine when I use the longer version of serializer that I 
created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388102#comment-15388102
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
Adding some more cotext to the implementation details. which is based on 
the design proposal 
(https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing)

Current security implementation works in a subtle way utilizing the Keberos 
cache of the user who starts Flink process/jobs and only in the context of 
supporting secure access to Hadoop cluster. The underlying UGI implementation 
of Hadoop infrastructure is used to harden the security using the keytab cache. 
For Yarn mode of deployment, delegation tokens are created and populated to 
container environment (App Master/JM and TM). 

There are two areas of improvement that current implementation lacks:
1) Tokens will be expired in due course and hence it impacts long running 
jobs
2) Missing functionality to support secure connection to Kafka and ZK 
(Kafka 0.9 and latest ZK versions are supporting kerberos based authentication 
using SASL/JAAS)

This PR addresses above gaps by providing Keytab support to securely 
communicate to Hadoop and Kafka/ZK services.

1) Additional Configurations: 

Below new security specific configurations are added to the Flink 
configuration file.
a) security.principal - user principal that Flink process/connectors should 
authenticate as 
b) security.keytab - keytab file location

In standlone mode, it is assumed that the configurations pre-exists (manual 
process) on all cluster nodes from where the JM and TMs will be running. 

In Yarn mode, the configuration (and keytab file) is expected only on the 
node from where YarnCLI or FlinkCLI will be invoked. Application code takes 
care of copying Keytab file to JM/TM Yarn containers as local resource for 
lookup.

In the absence of providing security configurations, the delegation token 
mechanism still works to support backward compatibility (manual kinit before 
starting JM/TMs).

2) Process-wide in-memory JAAS configuration to enable Kafka/ZK secure 
authentication.
 
The JAAS configuration plays a critical role in authentication for 
Kerberized application. Kafka/ZK login module code is expected to construct a 
login context based on supplied JAAS configuration file entries and 
authenticates to produce a subject.  The context is constructed with an 
application name which acts as a lookup key into the configuration, yielding 
one or more login modules.   The login module implements the specific strategy, 
such as using a configured keytab or using the user’s ticket cache.

Instead of managing per-connector JAAS configuration file, a process-wide 
JAAS configuration object is initialized during Flink bootstrap phase, thus 
providing a singular login module to all callers configured to login using the 
supplied keytab.

(https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html#setConfiguration(javax.security.auth.login.Configuration)

To summarize, following sequence happens when the secure configuration is 
enabled.
Flink bootstrap code (both Yarn and Standalone) initializes security 
context by
a) Initializing UGI with the supplied keytab and principal which takes care 
of handling Kerberos authentication and login renewal for Hadoop services. 
b) Creating process-wide JAAS configuration object for Kafka/ZK login 
modules to support Kerberos/SASL authentication. Login renewals are 
automatically taken care by ZK and Kafka login module implementation.

Some additional details are provided in the documentation page as well that 
can be referenced from here.

(https://github.com/vijikarthi/flink/blob/FLINK-3929/docs/internals/flink_security.md)


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone 

[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-07-21 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
Adding some more cotext to the implementation details. which is based on 
the design proposal 
(https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing)

Current security implementation works in a subtle way utilizing the Keberos 
cache of the user who starts Flink process/jobs and only in the context of 
supporting secure access to Hadoop cluster. The underlying UGI implementation 
of Hadoop infrastructure is used to harden the security using the keytab cache. 
For Yarn mode of deployment, delegation tokens are created and populated to 
container environment (App Master/JM and TM). 

There are two areas of improvement that current implementation lacks:
1) Tokens will be expired in due course and hence it impacts long running 
jobs
2) Missing functionality to support secure connection to Kafka and ZK 
(Kafka 0.9 and latest ZK versions are supporting kerberos based authentication 
using SASL/JAAS)

This PR addresses above gaps by providing Keytab support to securely 
communicate to Hadoop and Kafka/ZK services.

1) Additional Configurations: 

Below new security specific configurations are added to the Flink 
configuration file.
a) security.principal - user principal that Flink process/connectors should 
authenticate as 
b) security.keytab - keytab file location

In standlone mode, it is assumed that the configurations pre-exists (manual 
process) on all cluster nodes from where the JM and TMs will be running. 

In Yarn mode, the configuration (and keytab file) is expected only on the 
node from where YarnCLI or FlinkCLI will be invoked. Application code takes 
care of copying Keytab file to JM/TM Yarn containers as local resource for 
lookup.

In the absence of providing security configurations, the delegation token 
mechanism still works to support backward compatibility (manual kinit before 
starting JM/TMs).

2) Process-wide in-memory JAAS configuration to enable Kafka/ZK secure 
authentication.
 
The JAAS configuration plays a critical role in authentication for 
Kerberized application. Kafka/ZK login module code is expected to construct a 
login context based on supplied JAAS configuration file entries and 
authenticates to produce a subject.  The context is constructed with an 
application name which acts as a lookup key into the configuration, yielding 
one or more login modules.   The login module implements the specific strategy, 
such as using a configured keytab or using the user’s ticket cache.

Instead of managing per-connector JAAS configuration file, a process-wide 
JAAS configuration object is initialized during Flink bootstrap phase, thus 
providing a singular login module to all callers configured to login using the 
supplied keytab.

(https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html#setConfiguration(javax.security.auth.login.Configuration)

To summarize, following sequence happens when the secure configuration is 
enabled.
Flink bootstrap code (both Yarn and Standalone) initializes security 
context by
a) Initializing UGI with the supplied keytab and principal which takes care 
of handling Kerberos authentication and login renewal for Hadoop services. 
b) Creating process-wide JAAS configuration object for Kafka/ZK login 
modules to support Kerberos/SASL authentication. Login renewals are 
automatically taken care by ZK and Kafka login module implementation.

Some additional details are provided in the documentation page as well that 
can be referenced from here.

(https://github.com/vijikarthi/flink/blob/FLINK-3929/docs/internals/flink_security.md)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4250) Cannot select other than first column from Table

2016-07-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-4250:
-
Priority: Critical  (was: Major)

> Cannot select other than first column from Table
> 
>
> Key: FLINK-4250
> URL: https://issues.apache.org/jira/browse/FLINK-4250
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> Using the Scala Table API and the {{CsvTableSource}} I cannot select a column 
> from the csv source. The following code:
> {code}
> package com.dataartisans.batch
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.api.table.sources.CsvTableSource
> import org.apache.flink.api.table.{Row, TableEnvironment, Table}
> object CsvTableAPIJob {
>   def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val csvFilePath = "table-jobs/src/main/resources/input.csv"
> val tblEnv = TableEnvironment.getTableEnvironment(env)
> val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", 
> "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO))
> tblEnv.registerTableSource("foobar", csvTS)
> val input = tblEnv.sql("SELECT user FROM foobar")
> tblEnv.toDataSet[Row](input).print()
>   }
> }
> {code}
> fails with 
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
>   at 
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
>   at 
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
>   at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
>   at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4250) Cannot select other than first column from Table

2016-07-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-4250:
-
Summary: Cannot select other than first column from Table  (was: Cannot 
select column from CsvTableSource)

> Cannot select other than first column from Table
> 
>
> Key: FLINK-4250
> URL: https://issues.apache.org/jira/browse/FLINK-4250
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>
> Using the Scala Table API and the {{CsvTableSource}} I cannot select a column 
> from the csv source. The following code:
> {code}
> package com.dataartisans.batch
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.api.table.sources.CsvTableSource
> import org.apache.flink.api.table.{Row, TableEnvironment, Table}
> object CsvTableAPIJob {
>   def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val csvFilePath = "table-jobs/src/main/resources/input.csv"
> val tblEnv = TableEnvironment.getTableEnvironment(env)
> val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", 
> "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO))
> tblEnv.registerTableSource("foobar", csvTS)
> val input = tblEnv.sql("SELECT user FROM foobar")
> tblEnv.toDataSet[Row](input).print()
>   }
> }
> {code}
> fails with 
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
>   at 
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
>   at 
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
>   at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
>   at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2051
  
A simpler way to get the serializer may be
```java
TypeInformation.of(new TypeHint>(){}).createSerializer(null);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4250) Cannot select column from CsvTableSource

2016-07-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388074#comment-15388074
 ] 

Till Rohrmann commented on FLINK-4250:
--

The problem seems to be more general than the {{CsvTableSource}}. The following 
code also fails:

{code}
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val csvFilePath = "table-jobs/src/main/resources/input.csv"

val tblEnv = TableEnvironment.getTableEnvironment(env)

val inputDS = env.fromElements((1, "foo", 1.0, "12"))

tblEnv.registerDataSet("foobar", inputDS, 'key, 'user, 'value, 'timestamp)

val input = tblEnv.sql("SELECT value FROM foobar")

tblEnv.toDataSet[Row](input).print()
  }
{code}

But this time it fails with

{code}
Exception in thread "main" org.apache.calcite.sql.parser.SqlParseException: 
Encountered "value" at line 1, column 8.
Was expecting one of:
...
{code}

> Cannot select column from CsvTableSource
> 
>
> Key: FLINK-4250
> URL: https://issues.apache.org/jira/browse/FLINK-4250
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>
> Using the Scala Table API and the {{CsvTableSource}} I cannot select a column 
> from the csv source. The following code:
> {code}
> package com.dataartisans.batch
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.api.table.sources.CsvTableSource
> import org.apache.flink.api.table.{Row, TableEnvironment, Table}
> object CsvTableAPIJob {
>   def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val csvFilePath = "table-jobs/src/main/resources/input.csv"
> val tblEnv = TableEnvironment.getTableEnvironment(env)
> val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", 
> "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO))
> tblEnv.registerTableSource("foobar", csvTS)
> val input = tblEnv.sql("SELECT user FROM foobar")
> tblEnv.toDataSet[Row](input).print()
>   }
> }
> {code}
> fails with 
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
>   at 
> org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
>   at 
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
>   at 
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
>   at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
>   at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> 

[jira] [Created] (FLINK-4250) Cannot select column from CsvTableSource

2016-07-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4250:


 Summary: Cannot select column from CsvTableSource
 Key: FLINK-4250
 URL: https://issues.apache.org/jira/browse/FLINK-4250
 Project: Flink
  Issue Type: Bug
  Components: Scala API, Table API & SQL
Affects Versions: 1.1.0
Reporter: Till Rohrmann


Using the Scala Table API and the {{CsvTableSource}} I cannot select a column 
from the csv source. The following code:

{code}
package com.dataartisans.batch

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.api.table.sources.CsvTableSource
import org.apache.flink.api.table.{Row, TableEnvironment, Table}

object CsvTableAPIJob {
  def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val csvFilePath = "table-jobs/src/main/resources/input.csv"

val tblEnv = TableEnvironment.getTableEnvironment(env)
val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", 
"timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO))

tblEnv.registerTableSource("foobar", csvTS)

val input = tblEnv.sql("SELECT user FROM foobar")

tblEnv.toDataSet[Row](input).print()
  }
}
{code}

fails with 

{code}
Exception in thread "main" org.apache.flink.api.table.codegen.CodeGenException: 
Unsupported call: USER
at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
at 
org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
at 
org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
at 
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
at 
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3779) Add support for queryable state

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388069#comment-15388069
 ] 

ASF GitHub Bot commented on FLINK-3779:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2051
  
A simpler way to get the serializer may be
```java
TypeInformation.of(new TypeHint>(){}).createSerializer(null);
```


> Add support for queryable state
> ---
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3779) Add support for queryable state

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388058#comment-15388058
 ] 

ASF GitHub Bot commented on FLINK-3779:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2051
  
Regarding local vs. cluster mode: that's on purpose, but we can certainly 
change that behaviour. For now, you would have to run in cluster mode. 

Regarding the serializer: assuming that it is a Flink `Tuple2` you can use the following to get the serializer:

```java
TypeSerializer[] fieldSerializers = new TypeSerializer[] {
StringSerializer.INSTANCE,
LongSerializer.INSTANCE
};

TypeSerializer> serializer = new TupleSerializer<>(
(Class>) (Class) Tuple2.class, 
fieldSerializers);
```

**Just to make sure that we are on the same page: the state of this PR is 
not the final queryable state API, but only the initial low-level version.** 
Really looking forward to further feedback. Thank you for trying it out at this 
stage. :-)


> Add support for queryable state
> ---
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2051
  
Regarding local vs. cluster mode: that's on purpose, but we can certainly 
change that behaviour. For now, you would have to run in cluster mode. 

Regarding the serializer: assuming that it is a Flink `Tuple2` you can use the following to get the serializer:

```java
TypeSerializer[] fieldSerializers = new TypeSerializer[] {
StringSerializer.INSTANCE,
LongSerializer.INSTANCE
};

TypeSerializer> serializer = new TupleSerializer<>(
(Class>) (Class) Tuple2.class, 
fieldSerializers);
```

**Just to make sure that we are on the same page: the state of this PR is 
not the final queryable state API, but only the initial low-level version.** 
Really looking forward to further feedback. Thank you for trying it out at this 
stage. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3779) Add support for queryable state

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388043#comment-15388043
 ] 

ASF GitHub Bot commented on FLINK-3779:
---

Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Never mind, I was hitting with wrong key, it works now! Cheers.


> Add support for queryable state
> ---
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Never mind, I was hitting with wrong key, it works now! Cheers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4243) Change "Scope" to "Namespace" for Metrics

2016-07-21 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387982#comment-15387982
 ] 

Chesnay Schepler commented on FLINK-4243:
-

I'm not really against this, but i would be curious what you base this on.

> Change "Scope" to "Namespace" for Metrics
> -
>
> Key: FLINK-4243
> URL: https://issues.apache.org/jira/browse/FLINK-4243
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>
> It seems that using **"namespace"** for the names (dotted concatenation) of 
> metric groups is more common than **"scope"**.
> I suggest to follow that terminology in Flink.
> [~jgrier] and [~Zentol], I would be interested in what you think there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387978#comment-15387978
 ] 

ASF GitHub Bot commented on FLINK-4202:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2271
  
The Javadoc for the `RestartingTimeGauge` is not consistent with the 
implementation.

The Javadocs say "The time between the RESTARTING and RUNNING".

The implementation says either
* the time between RESTARTING and RUNNING
* the time since RESTARTING
* the time between RESTARTING and FAILED/SOME_OTHER_TERMINAL_STATE


> Add JM metric which shows the restart duration
> --
>
> Key: FLINK-4202
> URL: https://issues.apache.org/jira/browse/FLINK-4202
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> It is convenient for users to have a metric which tells you how long the 
> restarting of a job has taken.
> I propose a to introduce a {{Gauge}} which returns the time between 
> {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not 
> restarted (initial run), then this metric will return {{-1}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2271: [FLINK-4202] Add restarting time JM metric

2016-07-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2271
  
The Javadoc for the `RestartingTimeGauge` is not consistent with the 
implementation.

The Javadocs say "The time between the RESTARTING and RUNNING".

The implementation says either
* the time between RESTARTING and RUNNING
* the time since RESTARTING
* the time between RESTARTING and FAILED/SOME_OTHER_TERMINAL_STATE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4245) Metric naming improvements

2016-07-21 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387967#comment-15387967
 ] 

Chesnay Schepler commented on FLINK-4245:
-

1. As long as we don't have unique operator names you cannot have collision 
free operator metrics. Period. I am getting really tired of explaining this.
2. If you only want to change the naming for JMX I suggest to change the tile 
to "JMX naming improvements".
3. Your suggestion regarding the domain goes against JMX best practices. They 
should always start with "org.apache.flink".
4. Please provide a reasoning as to the domain changes.
5. Please provide a comparison as to how a operator and task metric would 
differ, (domain and tags) that include all tags that are defined in the current 
default scope formats.
6. In general, using what at one point were called "categories" as keys isn't a 
bad idea. Note however that this becomes inconsistent with user-defined groups, 
which is the reason we currently only use auto-generated keys.
7. Please provide the use-case regarding [~mdaxini]; i am curious as to what 
these changes are supposed to allow.



> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-21 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387961#comment-15387961
 ] 

Josh Forman-Gornall commented on FLINK-4190:


Thanks! Oh nice, this looks like a better solution for checking for bucket 
inactivity...

For the tests, is there any reason not to fold all of those tests into the new 
`BucketingSinkTest`? Currently there's 4:(BucketingSinkITCase, 
BucketingSinkFaultToleranceITCase, BucketingSinkFaultTolerance2ITCase, 
BucketingSinkMultipleActiveBucketsCase)

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3779) Add support for queryable state

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387930#comment-15387930
 ] 

ASF GitHub Bot commented on FLINK-3779:
---

Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Hi,

Continuing the discussion from the mailing list, I was able to go past the 
NettyConfig problem once I ran Flink in cluster mode ( I would still like to 
know if there is a way to run in local mode so that I can avoid running SBT 
assembly every time ).

But now I am stuck at error message "KvState does not hold any state for 
key/namespace." which I believe is because of my KeySerializer. Since I am 
running the QueryClient as a separate application, I don't have access to my 
queryableState to call `queryableState.getKeySerializer`

My key is a tuple of (Long,String) and this is the naive serializer that I 
wrote (which is probably wrong and I have never written a serializer before)

```
class KeySerializer extends TypeSerializerSingleton[(Long,String)]{

private val EMPTY: (Long,String) = (0,"")

override def createInstance(): (Long, String) = EMPTY

override def getLength: Int = return 2;

override def canEqual(o: scala.Any): Boolean = return 
o.isInstanceOf[(Long,String)]

override def copy(t: (Long, String)): (Long, String) = t

override def copy(t: (Long, String), t1: (Long, String)): (Long, 
String) = t

override def copy(dataInputView: DataInputView, dataOutputView: 
DataOutputView): Unit =  {
  dataOutputView.writeLong(dataInputView.readLong())
  StringValue.copyString(dataInputView,dataOutputView)
}

override def serialize(t: (Long, String), dataOutputView: 
DataOutputView): Unit = {
  dataOutputView.writeLong(t._1)
  StringValue.writeString(t._2,dataOutputView)
}

override def isImmutableType: Boolean = true

override def deserialize(dataInputView: DataInputView): (Long, String) 
= {
  val l = dataInputView.readLong()
  val s = StringValue.readString(dataInputView)
  (l,s)
}

override def deserialize(t: (Long, String), dataInputView: 
DataInputView): (Long, String) = deserialize(dataInputView)
  }
```

Can you tell me what I am doing wrong here? Thanks!


> Add support for queryable state
> ---
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Hi,

Continuing the discussion from the mailing list, I was able to go past the 
NettyConfig problem once I ran Flink in cluster mode ( I would still like to 
know if there is a way to run in local mode so that I can avoid running SBT 
assembly every time ).

But now I am stuck at error message "KvState does not hold any state for 
key/namespace." which I believe is because of my KeySerializer. Since I am 
running the QueryClient as a separate application, I don't have access to my 
queryableState to call `queryableState.getKeySerializer`

My key is a tuple of (Long,String) and this is the naive serializer that I 
wrote (which is probably wrong and I have never written a serializer before)

```
class KeySerializer extends TypeSerializerSingleton[(Long,String)]{

private val EMPTY: (Long,String) = (0,"")

override def createInstance(): (Long, String) = EMPTY

override def getLength: Int = return 2;

override def canEqual(o: scala.Any): Boolean = return 
o.isInstanceOf[(Long,String)]

override def copy(t: (Long, String)): (Long, String) = t

override def copy(t: (Long, String), t1: (Long, String)): (Long, 
String) = t

override def copy(dataInputView: DataInputView, dataOutputView: 
DataOutputView): Unit =  {
  dataOutputView.writeLong(dataInputView.readLong())
  StringValue.copyString(dataInputView,dataOutputView)
}

override def serialize(t: (Long, String), dataOutputView: 
DataOutputView): Unit = {
  dataOutputView.writeLong(t._1)
  StringValue.writeString(t._2,dataOutputView)
}

override def isImmutableType: Boolean = true

override def deserialize(dataInputView: DataInputView): (Long, String) 
= {
  val l = dataInputView.readLong()
  val s = StringValue.readString(dataInputView)
  (l,s)
}

override def deserialize(t: (Long, String), dataInputView: 
DataInputView): (Long, String) = deserialize(dataInputView)
  }
```

Can you tell me what I am doing wrong here? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4249) CsvTableSource does not support reading SqlTimeTypeInfo types

2016-07-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4249:


 Summary: CsvTableSource does not support reading SqlTimeTypeInfo 
types
 Key: FLINK-4249
 URL: https://issues.apache.org/jira/browse/FLINK-4249
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.1.0
Reporter: Till Rohrmann


The Table API's {{CsvTableSource}} does not support to read all Table API 
supported data types. For example, it is not possible to read 
{{SqlTimeTypeInfo}} types via the {{CsvTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4248) CsvTableSource does not support reading SqlTimeTypeInfo types

2016-07-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4248:


 Summary: CsvTableSource does not support reading SqlTimeTypeInfo 
types
 Key: FLINK-4248
 URL: https://issues.apache.org/jira/browse/FLINK-4248
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.1.0
Reporter: Till Rohrmann


The Table API's {{CsvTableSource}} does not support to read all Table API 
supported data types. For example, it is not possible to read 
{{SqlTimeTypeInfo}} types via the {{CsvTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4247) CsvTableSource.getDataSet() expects Java ExecutionEnvironment

2016-07-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4247:


 Summary: CsvTableSource.getDataSet() expects Java 
ExecutionEnvironment
 Key: FLINK-4247
 URL: https://issues.apache.org/jira/browse/FLINK-4247
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.1.0
Reporter: Till Rohrmann
Priority: Minor


The Table API offers the {{CsvTableSource}} which can be used with the Java and 
Scala API. However, if used with the Scala API where on has obtained a 
{{scala.api.ExecutionEnvironment}} there is a problem with the 
{{CsvTableSource.getDataSet}} method. The method expects a 
{{java.api.ExecutionEnvironment}} to extract the underlying {{DataSet}}. 
Additionally it returns a {{java.api.DataSet}} instead of a 
{{scala.api.DataSet}}. I think we should also offer a Scala API specific 
CsvTableSource which works with the respective Scala counterparts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

2016-07-21 Thread fpompermaier
Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/1989
  
Don't worry @twalthr, it will be nice to have it in the 1.1 but I 
understand that it's not a priority :)
Thanks a lot for the update!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387913#comment-15387913
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/1989
  
Don't worry @twalthr, it will be nice to have it in the 1.1 but I 
understand that it's not a priority :)
Thanks a lot for the update!


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: FLINK-3901
> URL: https://issues.apache.org/jira/browse/FLINK-3901
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>Priority: Minor
>  Labels: csv, null-values, row, tuple
>
> At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that 
> has the big limitation of 25 fields and null handling.
> A new IF producing Row object is indeed necessary to avoid those limitations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4244) Field names for union operator do not have to be equal

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387907#comment-15387907
 ] 

ASF GitHub Bot commented on FLINK-4244:
---

GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2280

[FLINK-4244] [docs] Field names for union operator do not have to be equal

We just merged FLINK-2985 , but not update the document. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink FLINK-4244

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2280.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2280


commit a3174fb89c7e2ab3e7bcb4f88c9ab3dbe7d47473
Author: Jark Wu 
Date:   2016-07-21T15:25:44Z

[FLINK-4244] [docs] Field names for union operator do not have to be equal




> Field names for union operator do not have to be equal
> --
>
> Key: FLINK-4244
> URL: https://issues.apache.org/jira/browse/FLINK-4244
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Jark Wu
>Priority: Trivial
>
> Flink Table API's documentation says that the schemas of unioned tables have 
> to be identical (wrt types and names). However, union works also with tables 
> where the types are identical but not the names:
> {code}
> val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, 
> x.toDouble)}
> val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, 
> x.toDouble)}
> val inputDS1 = env.fromCollection(input1Seq)
> val inputDS2 = env.fromCollection(input2Seq)
> val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c)
> tblEnv.registerTable("foobar", input1)
> val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f)
> tblEnv.registerTable("foobar2", input2)
> val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM 
> foobar2")
> tblEnv.toDataSet[Row](result).print()
> {code}
> We should update the documentation accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2280: [FLINK-4244] [docs] Field names for union operator...

2016-07-21 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2280

[FLINK-4244] [docs] Field names for union operator do not have to be equal

We just merged FLINK-2985 , but not update the document. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink FLINK-4244

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2280.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2280


commit a3174fb89c7e2ab3e7bcb4f88c9ab3dbe7d47473
Author: Jark Wu 
Date:   2016-07-21T15:25:44Z

[FLINK-4244] [docs] Field names for union operator do not have to be equal




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4244) Field names for union operator do not have to be equal

2016-07-21 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-4244:
--

Assignee: Jark Wu

> Field names for union operator do not have to be equal
> --
>
> Key: FLINK-4244
> URL: https://issues.apache.org/jira/browse/FLINK-4244
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Jark Wu
>Priority: Trivial
>
> Flink Table API's documentation says that the schemas of unioned tables have 
> to be identical (wrt types and names). However, union works also with tables 
> where the types are identical but not the names:
> {code}
> val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, 
> x.toDouble)}
> val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, 
> x.toDouble)}
> val inputDS1 = env.fromCollection(input1Seq)
> val inputDS2 = env.fromCollection(input2Seq)
> val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c)
> tblEnv.registerTable("foobar", input1)
> val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f)
> tblEnv.registerTable("foobar2", input2)
> val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM 
> foobar2")
> tblEnv.toDataSet[Row](result).print()
> {code}
> We should update the documentation accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387892#comment-15387892
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/1989
  
@fpompermaier I started working on this, but I can not guarantee that it 
will be part of the release.


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: FLINK-3901
> URL: https://issues.apache.org/jira/browse/FLINK-3901
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>Priority: Minor
>  Labels: csv, null-values, row, tuple
>
> At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that 
> has the big limitation of 25 fields and null handling.
> A new IF producing Row object is indeed necessary to avoid those limitations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

2016-07-21 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/1989
  
@fpompermaier I started working on this, but I can not guarantee that it 
will be part of the release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4246) Allow Specifying Multiple Metrics Reporters

2016-07-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387867#comment-15387867
 ] 

Stephan Ewen commented on FLINK-4246:
-

+1, I like this idea

> Allow Specifying Multiple Metrics Reporters
> ---
>
> Key: FLINK-4246
> URL: https://issues.apache.org/jira/browse/FLINK-4246
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> We should allow specifying multiple reporters. A rough sketch of how the 
> configuration should look like is this:
> {code}
> metrics.reporters = foo,bar
> metrics.reporter.foo.class = JMXReporter.class
> metrics.reporter.foo.port = 42-117
> metrics.reporter.bar.class = GangliaReporter.class
> metrics.reporter.bar.port = 512
> metrics.reporter.bar.whatever = 2
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4192) Move Metrics API to separate module

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387865#comment-15387865
 ] 

ASF GitHub Bot commented on FLINK-4192:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2226
  
Will merge this, leaving the config as is.


> Move Metrics API to separate module
> ---
>
> Key: FLINK-4192
> URL: https://issues.apache.org/jira/browse/FLINK-4192
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>
> All metrics code currently resides in flink-core. If a user implements a 
> reporter and wants a fat jar it will now have to include the entire 
> flink-core module.
> Instead, we could move several interfaces into a separate module.
> These interfaces to move include:
> * Counter, Gauge, Histogram(Statistics)
> * MetricGroup
> * MetricReporter, Scheduled, AbstractReporter
> In addition a new MetricRegistry interface will be required as well as a 
> replacement for the Configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2226: [FLINK-4192] - Move Metrics API to separate module

2016-07-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2226
  
Will merge this, leaving the config as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4152) TaskManager registration exponential backoff doesn't work

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387859#comment-15387859
 ] 

ASF GitHub Bot commented on FLINK-4152:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2257
  
Hi @tillrohrmann! Your changes look good. I would say this is good to merge 
once we have fixed the issue with the `YarnFlinkResourceManagerTest` on Travis.


> TaskManager registration exponential backoff doesn't work
> -
>
> Key: FLINK-4152
> URL: https://issues.apache.org/jira/browse/FLINK-4152
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, TaskManager, YARN Client
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
> Attachments: logs.tgz
>
>
> While testing Flink 1.1 I've found that the TaskManagers are logging many 
> messages when registering at the JobManager.
> This is the log file: 
> https://gist.github.com/rmetzger/0cebe0419cdef4507b1e8a42e33ef294
> Its logging more than 3000 messages in less than a minute. I don't think that 
> this is the expected behavior.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4246) Allow Specifying Multiple Metrics Reporters

2016-07-21 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4246:
---

 Summary: Allow Specifying Multiple Metrics Reporters
 Key: FLINK-4246
 URL: https://issues.apache.org/jira/browse/FLINK-4246
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.1.0
Reporter: Aljoscha Krettek
 Fix For: 1.1.0


We should allow specifying multiple reporters. A rough sketch of how the 
configuration should look like is this:

{code}
metrics.reporters = foo,bar
metrics.reporter.foo.class = JMXReporter.class
metrics.reporter.foo.port = 42-117
metrics.reporter.bar.class = GangliaReporter.class
metrics.reporter.bar.port = 512
metrics.reporter.bar.whatever = 2
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4229) Do not start Metrics Reporter by default

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387848#comment-15387848
 ] 

ASF GitHub Bot commented on FLINK-4229:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2279#discussion_r71722866
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java 
---
@@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception {
JMXReporter rep2 = new JMXReporter();
 
Configuration cfg1 = new Configuration();
-   cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"-port 9020-9035");
+   cfg1.setString("port", "9020-9035");
--- End diff --

Got it, it is the already parsed sub-config.


> Do not start Metrics Reporter by default
> 
>
> Key: FLINK-4229
> URL: https://issues.apache.org/jira/browse/FLINK-4229
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> By default, we start a JMX reported that binds to a port and comes with extra 
> threads. 
> We should not start any reported by default to keep the overhead to a minimum.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4244) Field names for union operator do not have to be equal

2016-07-21 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387851#comment-15387851
 ] 

Jark Wu commented on FLINK-4244:


Yes, we just merged [FLINK-2985] , but not update the document. I will update 
it. 


 [FLINK-2985] Allow different field names for unionAll() in Table API. 

> Field names for union operator do not have to be equal
> --
>
> Key: FLINK-4244
> URL: https://issues.apache.org/jira/browse/FLINK-4244
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Trivial
>
> Flink Table API's documentation says that the schemas of unioned tables have 
> to be identical (wrt types and names). However, union works also with tables 
> where the types are identical but not the names:
> {code}
> val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, 
> x.toDouble)}
> val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, 
> x.toDouble)}
> val inputDS1 = env.fromCollection(input1Seq)
> val inputDS2 = env.fromCollection(input2Seq)
> val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c)
> tblEnv.registerTable("foobar", input1)
> val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f)
> tblEnv.registerTable("foobar2", input2)
> val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM 
> foobar2")
> tblEnv.toDataSet[Row](result).print()
> {code}
> We should update the documentation accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4229) Do not start Metrics Reporter by default

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387849#comment-15387849
 ] 

ASF GitHub Bot commented on FLINK-4229:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2279
  
Looks good, will merge this...


> Do not start Metrics Reporter by default
> 
>
> Key: FLINK-4229
> URL: https://issues.apache.org/jira/browse/FLINK-4229
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> By default, we start a JMX reported that binds to a port and comes with extra 
> threads. 
> We should not start any reported by default to keep the overhead to a minimum.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2279: [FLINK-4229] Only start JMX server when port is sp...

2016-07-21 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2279#discussion_r71722866
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java 
---
@@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception {
JMXReporter rep2 = new JMXReporter();
 
Configuration cfg1 = new Configuration();
-   cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"-port 9020-9035");
+   cfg1.setString("port", "9020-9035");
--- End diff --

Got it, it is the already parsed sub-config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2279: [FLINK-4229] Only start JMX server when port is specified

2016-07-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2279
  
Looks good, will merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2276: [FLINK-4201] [runtime] Forward suspend to checkpoint coor...

2016-07-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2276
  
I think as it is, it is fine.

Let's extend the option for HA in a separate issue. We could have:
  - high-availability: *none* (what is currently *standalone recovery*)
  - high-availability: *filesystem* (no protection against split brain 
partitioning)
  - high-availability: *zookeeper*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4240) Cannot use expressions in Scala Table API's groupBy method

2016-07-21 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387824#comment-15387824
 ] 

Jark Wu commented on FLINK-4240:


It's not a bug I think as we cannot select a column not in Group By. 

In this case we can rename the grouping field and it will work
{code}
val tblResult = input1.groupBy('a % 4 as 'd).select('d);
{code}


> Cannot use expressions in Scala Table API's groupBy method
> --
>
> Key: FLINK-4240
> URL: https://issues.apache.org/jira/browse/FLINK-4240
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>
> The following code fails even though it should be supported according to the 
> documentation:
> {code}
> package com.dataartisans.batch
> import org.apache.flink.api.scala._
> import org.apache.flink.api.scala.table._
> import org.apache.flink.api.table.{Row, TableConfig, TableEnvironment}
> object ScalaSimpleTableAPIJob {
>   def main(args: Array[String]): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tblConfig = new TableConfig
> val tblEnv = TableEnvironment.getTableEnvironment(env, tblConfig)
> val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, 
> x.toDouble)}
> val inputDS1 = env.fromCollection(input1Seq)
> val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c)
> // fails with org.apache.flink.api.table.ValidationException: cannot 
> resolve [a] given input [('a % 4)]
> val tblResult = input1.groupBy('a % 4).select('a);
> val result = tblEnv.toDataSet[Row](tblResult)
> result.print()
>   }
> }
> {code}
> {code}
> Exception in thread "main" org.apache.flink.api.table.ValidationException: 
> cannot resolve [a] given input [('a % 4)]
>   at 
> org.apache.flink.api.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
>   at 
> org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:87)
>   at 
> org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:84)
>   at 
> org.apache.flink.api.table.trees.TreeNode.postOrderTransform(TreeNode.scala:72)
>   at 
> org.apache.flink.api.table.plan.logical.LogicalNode.org$apache$flink$api$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:120)
>   at 
> org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:133)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:132)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>   at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>   at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>   at 
> org.apache.flink.api.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
>   at 
> org.apache.flink.api.table.plan.logical.LogicalNode.validate(LogicalNode.scala:84)
>   at 
> org.apache.flink.api.table.plan.logical.Project.validate(operators.scala:57)
>   at org.apache.flink.api.table.GroupedTable.select(table.scala:631)
>   at 
> com.dataartisans.batch.ScalaSimpleTableAPIJob$.main(ScalaSimpleTableAPIJob.scala:26)
>   at 
> com.dataartisans.batch.ScalaSimpleTableAPIJob.main(ScalaSimpleTableAPIJob.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> 

[jira] [Commented] (FLINK-4229) Do not start Metrics Reporter by default

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387820#comment-15387820
 ] 

ASF GitHub Bot commented on FLINK-4229:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2279#discussion_r71720105
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java 
---
@@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception {
JMXReporter rep2 = new JMXReporter();
 
Configuration cfg1 = new Configuration();
-   cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"-port 9020-9035");
+   cfg1.setString("port", "9020-9035");
--- End diff --

The change of only starting a server when the ports are present should not 
change the config keys, correct?


> Do not start Metrics Reporter by default
> 
>
> Key: FLINK-4229
> URL: https://issues.apache.org/jira/browse/FLINK-4229
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> By default, we start a JMX reported that binds to a port and comes with extra 
> threads. 
> We should not start any reported by default to keep the overhead to a minimum.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2279: [FLINK-4229] Only start JMX server when port is sp...

2016-07-21 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2279#discussion_r71720105
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java 
---
@@ -88,7 +88,7 @@ public void testPortConflictHandling() throws Exception {
JMXReporter rep2 = new JMXReporter();
 
Configuration cfg1 = new Configuration();
-   cfg1.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"-port 9020-9035");
+   cfg1.setString("port", "9020-9035");
--- End diff --

The change of only starting a server when the ports are present should not 
change the config keys, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4229) Do not start Metrics Reporter by default

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387809#comment-15387809
 ] 

ASF GitHub Bot commented on FLINK-4229:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2279

[FLINK-4229] Only start JMX server when port is specified

The JMX reporter now only starts an extra server if a port (or port range) 
was specified. Otherwise, it will just have the default local JMX capabilities.

R: @StephanEwen 
R: @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jmx-no-server

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2279.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2279


commit 89fc415ee8b882ac67dc34be41c98811e35dae98
Author: Aljoscha Krettek 
Date:   2016-07-21T14:44:06Z

[FLINK-4229] Only start JMX server when port is specified




> Do not start Metrics Reporter by default
> 
>
> Key: FLINK-4229
> URL: https://issues.apache.org/jira/browse/FLINK-4229
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> By default, we start a JMX reported that binds to a port and comes with extra 
> threads. 
> We should not start any reported by default to keep the overhead to a minimum.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2279: [FLINK-4229] Only start JMX server when port is sp...

2016-07-21 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2279

[FLINK-4229] Only start JMX server when port is specified

The JMX reporter now only starts an extra server if a port (or port range) 
was specified. Otherwise, it will just have the default local JMX capabilities.

R: @StephanEwen 
R: @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jmx-no-server

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2279.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2279


commit 89fc415ee8b882ac67dc34be41c98811e35dae98
Author: Aljoscha Krettek 
Date:   2016-07-21T14:44:06Z

[FLINK-4229] Only start JMX server when port is specified




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387800#comment-15387800
 ] 

ASF GitHub Bot commented on FLINK-4201:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2276
  
I think as it is, it is fine.

Let's extend the option for HA in a separate issue. We could have:
  - high-availability: *none* (what is currently *standalone recovery*)
  - high-availability: *filesystem* (no protection against split brain 
partitioning)
  - high-availability: *zookeeper*


> Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted
> ---
>
> Key: FLINK-4201
> URL: https://issues.apache.org/jira/browse/FLINK-4201
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Ufuk Celebi
>Priority: Blocker
>
> For example, when shutting down a Yarn session, according to the logs 
> checkpoints for jobs that did not terminate are deleted. In the shutdown 
> hook, removeAllCheckpoints is called and removes checkpoints that should 
> still be kept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2276: [FLINK-4201] [runtime] Forward suspend to checkpoint coor...

2016-07-21 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2276
  
Thanks for taking a look, Stephan.

Regarding your question: *Would we interfere with such a setup when 
removing checkpoints on "suspend" in "standalone" mode?*:

Yes, we would interfere, but what you describe is currently **not** 
possible with Flink (that is no one can run it like that). The problem is that 
recovery on the master is tightly coupled to ZooKeeper (configured via 
`recovery.mode: ZOOKEEPER`). I really like your idea and agree that it should 
be possible to run an HA setup like that. I will open an issue for it. Do you 
think it's important to fix this for 1.1 already?

Regarding the name *standalone*:

I fully agree. We have a standalone cluster mode and standalone recovery 
mode. Our standalone recovery mode (`recovery.mode: STANDALONE`) actually means 
`NO_RECOVERY`. I think that's what also made you assume that what you describe 
is possible, right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387789#comment-15387789
 ] 

ASF GitHub Bot commented on FLINK-4201:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2276
  
Thanks for taking a look, Stephan.

Regarding your question: *Would we interfere with such a setup when 
removing checkpoints on "suspend" in "standalone" mode?*:

Yes, we would interfere, but what you describe is currently **not** 
possible with Flink (that is no one can run it like that). The problem is that 
recovery on the master is tightly coupled to ZooKeeper (configured via 
`recovery.mode: ZOOKEEPER`). I really like your idea and agree that it should 
be possible to run an HA setup like that. I will open an issue for it. Do you 
think it's important to fix this for 1.1 already?

Regarding the name *standalone*:

I fully agree. We have a standalone cluster mode and standalone recovery 
mode. Our standalone recovery mode (`recovery.mode: STANDALONE`) actually means 
`NO_RECOVERY`. I think that's what also made you assume that what you describe 
is possible, right? 


> Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted
> ---
>
> Key: FLINK-4201
> URL: https://issues.apache.org/jira/browse/FLINK-4201
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Ufuk Celebi
>Priority: Blocker
>
> For example, when shutting down a Yarn session, according to the logs 
> checkpoints for jobs that did not terminate are deleted. In the shutdown 
> hook, removeAllCheckpoints is called and removes checkpoints that should 
> still be kept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387767#comment-15387767
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Very good work!

I know we discussed before whether to check for inactivity in a different 
thread or in `invoke()`. There's actually a third option that I'm showcasing in 
the PR I did against your PR.  The StreamTask already has a TimerService that 
can be set for testing. If we use the appropriate methods in the bucketing sink 
then we get testability with a settable clock for free.

I also added a `BucketSinkTest`. It think it would be good if the 
`BucketSinkITCase` and `BucketingSinkMultipleActiveBucketsCase` could be folded 
into this one because having the ITCases means having a lot of overhead and our 
build is already taking quite long.

What do you think?


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4245) Metric naming improvements

2016-07-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387772#comment-15387772
 ] 

Stephan Ewen edited comment on FLINK-4245 at 7/21/16 2:26 PM:
--

An added benefit of that would be that, regardless of the defined scope format, 
JMX names never have collisions (because the tags will always be a unique set, 
since they include all unique IDs).

So, we would never have collision warnings using the JMX reporter.


was (Author: stephanewen):
An added benefit of that would be that, regardless of the defined scope format, 
JMX names never have collisions.
So, we would never have collision warnings using the JMX reporter.

> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4245) Metric naming improvements

2016-07-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387772#comment-15387772
 ] 

Stephan Ewen commented on FLINK-4245:
-

An added benefit of that would be that, regardless of the defined scope format, 
JMX names never have collisions.
So, we would never have collision warnings using the JMX reporter.

> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4245) Metric naming improvements

2016-07-21 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4245:
---

 Summary: Metric naming improvements
 Key: FLINK-4245
 URL: https://issues.apache.org/jira/browse/FLINK-4245
 Project: Flink
  Issue Type: Improvement
Reporter: Stephan Ewen


A metric currently has two parts to it:
  - The name of that particular metric
  - The "scope" (or namespace), defined by the group that contains the metric.

A metric group actually always implicitly has a map of naming "tags", like:
  - taskmanager_host : 
  - taskmanager_id : 
  - task_name : "map() -> filter()"

We derive the scope from that map, following the defined scope formats.

For JMX (and some users that use JMX), it would be natural to expose that map 
of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
can expose a metric like:

  - domain: "taskmanager.task.operator.io"
  - name: "numRecordsIn"
  - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
X.java:123", ... }

For many other reporters, the formatted scope makes a lot of sense, since they 
think only in terms of (scope, metric-name).
We may even have the formatted scope in JMX as well (in the domain), if we want 
to go that route. 

[~jgrier] and [~Zentol] - what do you think about that?

[~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387768#comment-15387768
 ] 

ASF GitHub Bot commented on FLINK-4202:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2271


> Add JM metric which shows the restart duration
> --
>
> Key: FLINK-4202
> URL: https://issues.apache.org/jira/browse/FLINK-4202
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> It is convenient for users to have a metric which tells you how long the 
> restarting of a job has taken.
> I propose a to introduce a {{Gauge}} which returns the time between 
> {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not 
> restarted (initial run), then this metric will return {{-1}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2271: [FLINK-4202] Add restarting time JM metric

2016-07-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2271


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4202) Add JM metric which shows the restart duration

2016-07-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4202.

Resolution: Done

Added via 5dd85e2867a639e10891209dd59be0136a19ecfa

> Add JM metric which shows the restart duration
> --
>
> Key: FLINK-4202
> URL: https://issues.apache.org/jira/browse/FLINK-4202
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> It is convenient for users to have a metric which tells you how long the 
> restarting of a job has taken.
> I propose a to introduce a {{Gauge}} which returns the time between 
> {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not 
> restarted (initial run), then this metric will return {{-1}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387754#comment-15387754
 ] 

ASF GitHub Bot commented on FLINK-4202:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2271
  
Thanks for the review @uce. Will merge this PR.


> Add JM metric which shows the restart duration
> --
>
> Key: FLINK-4202
> URL: https://issues.apache.org/jira/browse/FLINK-4202
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> It is convenient for users to have a metric which tells you how long the 
> restarting of a job has taken.
> I propose a to introduce a {{Gauge}} which returns the time between 
> {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not 
> restarted (initial run), then this metric will return {{-1}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-21 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Very good work!

I know we discussed before whether to check for inactivity in a different 
thread or in `invoke()`. There's actually a third option that I'm showcasing in 
the PR I did against your PR. 😃 The StreamTask already has a TimerService 
that can be set for testing. If we use the appropriate methods in the bucketing 
sink then we get testability with a settable clock for free.

I also added a `BucketSinkTest`. It think it would be good if the 
`BucketSinkITCase` and `BucketingSinkMultipleActiveBucketsCase` could be folded 
into this one because having the ITCases means having a lot of overhead and our 
build is already taking quite long.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on BlobSe...

2016-07-21 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2256
  
Thank you for your review. I've addressed your comment and now parent 
directories are deleted if empty, resulting in an empty storage folder after 
regular cleanup. If there are no objections, I would like to merge this later 
today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2271: [FLINK-4202] Add restarting time JM metric

2016-07-21 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2271
  
Thanks for the review @uce. Will merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2016-07-21 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/19dae21b00cfbf68ee64af80672d25974a3cd346#commitcomment-18339788
  
Let's leave the remainder as it is for now, at least as long as we do not 
have the name collisions fully resolved. I am filing a followup issue about JMX 
and names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387753#comment-15387753
 ] 

ASF GitHub Bot commented on FLINK-4150:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2256
  
Thank you for your review. I've addressed your comment and now parent 
directories are deleted if empty, resulting in an empty storage folder after 
regular cleanup. If there are no objections, I would like to merge this later 
today.


> Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
> 
>
> Key: FLINK-4150
> URL: https://issues.apache.org/jira/browse/FLINK-4150
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission
>Reporter: Stefan Richter
>Assignee: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.1.0
>
>
> Submitting a job in Yarn with HA can lead to the following exception:
> {code}
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
> file: 
> '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0'
>  (invalid JAR: zip file is empty)
> Class not resolvable through given classloader.
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Some job information, including the Blob ids, are stored in Zookeeper. The 
> actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set 
> to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the 
> cluster is shut down, the path for the BlobStore is deleted. When the cluster 
> is then restarted, recovering jobs cannot restore because it's Blob ids 
> stored in Zookeeper now point to deleted files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387744#comment-15387744
 ] 

ASF GitHub Bot commented on FLINK-4150:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2256#discussion_r71711908
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 ---
@@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() 
throws Exception {
}
}
 
+   /**
+* Tests that the persisted job is not removed from the job graph store
+* after the postStop method of the JobManager. Furthermore, it checks
+* that BLOBs of the JobGraph are recovered properly and cleaned up 
after
+* the job finishes.
+*/
+   @Test
+   public void testBlobRecoveryAfterLostJobManager() throws Exception {
+   FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+   FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, 
TimeUnit.SECONDS);
+   Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
+   Configuration flinkConfiguration = new Configuration();
+   UUID leaderSessionID = UUID.randomUUID();
+   UUID newLeaderSessionID = UUID.randomUUID();
+   int slots = 2;
+   ActorRef archiveRef = null;
+   ActorRef jobManagerRef = null;
+   ActorRef taskManagerRef = null;
+
+   String haStoragePath = temporaryFolder.newFolder().toString();
+
+   flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+   
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
haStoragePath);
+   
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
+
+   try {
+   MySubmittedJobGraphStore mySubmittedJobGraphStore = new 
MySubmittedJobGraphStore();
+   TestingLeaderElectionService myLeaderElectionService = 
new TestingLeaderElectionService();
+   TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService();
+
+   archiveRef = system.actorOf(Props.create(
+   MemoryArchivist.class,
+   10), "archive");
+
+   jobManagerRef = createJobManagerActor(
+   "jobmanager-0",
+   flinkConfiguration,
+   myLeaderElectionService,
+   mySubmittedJobGraphStore,
+   360,
+   timeout,
+   jobRecoveryTimeout, archiveRef);
+
+   ActorGateway jobManager = new 
AkkaActorGateway(jobManagerRef, leaderSessionID);
+
+   taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+   flinkConfiguration,
+   ResourceID.generate(),
+   system,
+   "localhost",
+   Option.apply("taskmanager"),
+   Option.apply((LeaderRetrievalService) 
myLeaderRetrievalService),
+   true,
+   TestingTaskManager.class);
+
+   ActorGateway tmGateway = new 
AkkaActorGateway(taskManagerRef, leaderSessionID);
+
+   Future tmAlive = 
tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
+
+   Await.ready(tmAlive, deadline.timeLeft());
+
+   JobVertex sourceJobVertex = new JobVertex("Source");
+   
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+   sourceJobVertex.setParallelism(slots);
+
+   JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
+
+   // Upload fake JAR file to first JobManager
+   File jarFile = temporaryFolder.newFile();
+   ZipOutputStream out = new ZipOutputStream(new 
FileOutputStream(jarFile));
+   out.close();
+
+   jobGraph.addJar(new Path(jarFile.toURI()));
+   JobClient.uploadJarFiles(jobGraph, jobManager, 
deadline.timeLeft());
+
+   Future isLeader = jobManager.ask(
+   

[GitHub] flink pull request #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on...

2016-07-21 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2256#discussion_r71711908
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 ---
@@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() 
throws Exception {
}
}
 
+   /**
+* Tests that the persisted job is not removed from the job graph store
+* after the postStop method of the JobManager. Furthermore, it checks
+* that BLOBs of the JobGraph are recovered properly and cleaned up 
after
+* the job finishes.
+*/
+   @Test
+   public void testBlobRecoveryAfterLostJobManager() throws Exception {
+   FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+   FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, 
TimeUnit.SECONDS);
+   Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
+   Configuration flinkConfiguration = new Configuration();
+   UUID leaderSessionID = UUID.randomUUID();
+   UUID newLeaderSessionID = UUID.randomUUID();
+   int slots = 2;
+   ActorRef archiveRef = null;
+   ActorRef jobManagerRef = null;
+   ActorRef taskManagerRef = null;
+
+   String haStoragePath = temporaryFolder.newFolder().toString();
+
+   flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+   
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
haStoragePath);
+   
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
+
+   try {
+   MySubmittedJobGraphStore mySubmittedJobGraphStore = new 
MySubmittedJobGraphStore();
+   TestingLeaderElectionService myLeaderElectionService = 
new TestingLeaderElectionService();
+   TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService();
+
+   archiveRef = system.actorOf(Props.create(
+   MemoryArchivist.class,
+   10), "archive");
+
+   jobManagerRef = createJobManagerActor(
+   "jobmanager-0",
+   flinkConfiguration,
+   myLeaderElectionService,
+   mySubmittedJobGraphStore,
+   360,
+   timeout,
+   jobRecoveryTimeout, archiveRef);
+
+   ActorGateway jobManager = new 
AkkaActorGateway(jobManagerRef, leaderSessionID);
+
+   taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+   flinkConfiguration,
+   ResourceID.generate(),
+   system,
+   "localhost",
+   Option.apply("taskmanager"),
+   Option.apply((LeaderRetrievalService) 
myLeaderRetrievalService),
+   true,
+   TestingTaskManager.class);
+
+   ActorGateway tmGateway = new 
AkkaActorGateway(taskManagerRef, leaderSessionID);
+
+   Future tmAlive = 
tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
+
+   Await.ready(tmAlive, deadline.timeLeft());
+
+   JobVertex sourceJobVertex = new JobVertex("Source");
+   
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+   sourceJobVertex.setParallelism(slots);
+
+   JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
+
+   // Upload fake JAR file to first JobManager
+   File jarFile = temporaryFolder.newFile();
+   ZipOutputStream out = new ZipOutputStream(new 
FileOutputStream(jarFile));
+   out.close();
+
+   jobGraph.addJar(new Path(jarFile.toURI()));
+   JobClient.uploadJarFiles(jobGraph, jobManager, 
deadline.timeLeft());
+
+   Future isLeader = jobManager.ask(
+   
TestingJobManagerMessages.getNotifyWhenLeader(),
+   deadline.timeLeft());
+
+   Future isConnectedToJobManager = tmGateway.ask(
+   new 

[jira] [Created] (FLINK-4244) Field names for union operator do not have to be equal

2016-07-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4244:


 Summary: Field names for union operator do not have to be equal
 Key: FLINK-4244
 URL: https://issues.apache.org/jira/browse/FLINK-4244
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.1.0
Reporter: Till Rohrmann
Priority: Trivial


Flink Table API's documentation says that the schemas of unioned tables have to 
be identical (wrt types and names). However, union works also with tables where 
the types are identical but not the names:

{code}
val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, x.toDouble)}
val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, 
x.toDouble)}

val inputDS1 = env.fromCollection(input1Seq)
val inputDS2 = env.fromCollection(input2Seq)

val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c)
tblEnv.registerTable("foobar", input1)
val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f)
tblEnv.registerTable("foobar2", input2)

val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM 
foobar2")

tblEnv.toDataSet[Row](result).print()
{code}

We should update the documentation accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics

2016-07-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-3962.
-
Resolution: Fixed

> JMXReporter doesn't properly register/deregister metrics
> 
>
> Key: FLINK-3962
> URL: https://issues.apache.org/jira/browse/FLINK-3962
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Stephan Ewen
> Fix For: 1.1.0
>
>
> The following fails our Yarn tests because it checks for errors in the 
> jobmanager/taskmanager logs:
> {noformat}
> 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1092)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:441)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:283)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   

[jira] [Created] (FLINK-4243) Change "Scope" to "Namespace" for Metrics

2016-07-21 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4243:
---

 Summary: Change "Scope" to "Namespace" for Metrics
 Key: FLINK-4243
 URL: https://issues.apache.org/jira/browse/FLINK-4243
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.1.0
Reporter: Stephan Ewen


It seems that using **"namespace"** for the names (dotted concatenation) of 
metric groups is more common than **"scope"**.

I suggest to follow that terminology in Flink.

[~jgrier] and [~Zentol], I would be interested in what you think there.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387721#comment-15387721
 ] 

ASF GitHub Bot commented on FLINK-4201:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2276
  
I think that change is good in "our" philosophy, just wondering about the 
following implication:

Some people want to run HA setups without ZooKeeper, simply using an 
external service to make sure that the JobManager is restarted. The latest 
completed checkpoint is found via a well-defined path in the checkpoint storage 
(rather than a well defined path in ZooKeeper).

That works as an HA setup (with the exception of being susceptible to 
"split brain" behavior in the presence of network partitions. 

Would we interfere with such a setup when removing checkpoints on "suspend" 
in "standalone" mode?

BTW: We should really find a new name for "standalone" more ;-) The term is 
overloaded.


> Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted
> ---
>
> Key: FLINK-4201
> URL: https://issues.apache.org/jira/browse/FLINK-4201
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Ufuk Celebi
>Priority: Blocker
>
> For example, when shutting down a Yarn session, according to the logs 
> checkpoints for jobs that did not terminate are deleted. In the shutdown 
> hook, removeAllCheckpoints is called and removes checkpoints that should 
> still be kept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics

2016-07-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-3962.
---

> JMXReporter doesn't properly register/deregister metrics
> 
>
> Key: FLINK-3962
> URL: https://issues.apache.org/jira/browse/FLINK-3962
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Stephan Ewen
> Fix For: 1.1.0
>
>
> The following fails our Yarn tests because it checks for errors in the 
> jobmanager/taskmanager logs:
> {noformat}
> 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1092)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:441)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:283)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> 

[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics

2016-07-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387726#comment-15387726
 ] 

Stephan Ewen commented on FLINK-3962:
-

This issue has been resolved by not throwing exceptions, but merely logging a 
simple warning.

> JMXReporter doesn't properly register/deregister metrics
> 
>
> Key: FLINK-3962
> URL: https://issues.apache.org/jira/browse/FLINK-3962
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Stephan Ewen
> Fix For: 1.1.0
>
>
> The following fails our Yarn tests because it checks for errors in the 
> jobmanager/taskmanager logs:
> {noformat}
> 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1092)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:441)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:283)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>   at 

[jira] [Commented] (FLINK-4202) Add JM metric which shows the restart duration

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387718#comment-15387718
 ] 

ASF GitHub Bot commented on FLINK-4202:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2271
  
+1 to merge. Tests and and comments are very good. I've tested this locally 
with a job with restarts and everything works as expected:

![screen shot 2016-07-21 at 15 47 
34](https://cloud.githubusercontent.com/assets/1756620/17025085/a38131ae-4f5a-11e6-8260-3d55f2025760.png)




> Add JM metric which shows the restart duration
> --
>
> Key: FLINK-4202
> URL: https://issues.apache.org/jira/browse/FLINK-4202
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> It is convenient for users to have a metric which tells you how long the 
> restarting of a job has taken.
> I propose a to introduce a {{Gauge}} which returns the time between 
> {{JobStatus.RESTARTING}} and {{JobStatus.RUNNING}}. If the job was not 
> restarted (initial run), then this metric will return {{-1}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2276: [FLINK-4201] [runtime] Forward suspend to checkpoint coor...

2016-07-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2276
  
I think that change is good in "our" philosophy, just wondering about the 
following implication:

Some people want to run HA setups without ZooKeeper, simply using an 
external service to make sure that the JobManager is restarted. The latest 
completed checkpoint is found via a well-defined path in the checkpoint storage 
(rather than a well defined path in ZooKeeper).

That works as an HA setup (with the exception of being susceptible to 
"split brain" behavior in the presence of network partitions. 

Would we interfere with such a setup when removing checkpoints on "suspend" 
in "standalone" mode?

BTW: We should really find a new name for "standalone" more ;-) The term is 
overloaded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2271: [FLINK-4202] Add restarting time JM metric

2016-07-21 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2271
  
+1 to merge. Tests and and comments are very good. I've tested this locally 
with a job with restarts and everything works as expected:

![screen shot 2016-07-21 at 15 47 
34](https://cloud.githubusercontent.com/assets/1756620/17025085/a38131ae-4f5a-11e6-8260-3d55f2025760.png)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2016-07-21 Thread aljoscha
Github user aljoscha commented on the pull request:


https://github.com/apache/flink/commit/19dae21b00cfbf68ee64af80672d25974a3cd346#commitcomment-18339338
  
That seems like a good idea, yes. Still not have the JMXReporter active by 
default or now have it active by default again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4242) Improve validation exception messages

2016-07-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4242:


 Summary: Improve validation exception messages
 Key: FLINK-4242
 URL: https://issues.apache.org/jira/browse/FLINK-4242
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.1.0
Reporter: Till Rohrmann
Priority: Minor


The Table API's validation exceptions could be improved to be more meaningful 
for users. For example, the following code snippet:

{code}
Table inputTable = tableEnv.fromDataStream(env.fromElements(
Tuple3.of(1, "a", 1.0),
Tuple3.of(2, "b", 2.0),
Tuple3.of(3, "c", 3.0)), "a, b, c");
inputTable.select("a").where("!a");
{code}

fails correctly. However, the validation exception message says "Expression 
!('a) failed on input check: Not only accepts child of Boolean Type, get 
Integer". I think it could be changed such that it says: "The not operator 
requires a boolean input but "a" is of type integer." or something similar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2271: [FLINK-4202] Add restarting time JM metric

2016-07-21 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2271#discussion_r71705985
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ExecutionGraphMetricsTest extends TestLogger {
+
+   /**
+* This test tests that the restarting time metric correctly displays 
restarting times.
+*/
+   @Test
+   public void testExecutionGraphRestartTimeMetric() throws JobException, 
IOException, InterruptedException {
--- End diff --

Very good test 👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >