[jira] [Commented] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-07-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529020#comment-16529020
 ] 

vinoyang commented on FLINK-9221:
-

[~joshlemer] I think we should not add this method to `SinkFunction` interface, 
it seems could be implemented in a util class. What's your opinion? 
[~till.rohrmann] 

> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>  Components: DataSet API, DataStream API
>Affects Versions: 1.5.0
>Reporter: Josh Lemer
>Assignee: vinoyang
>Priority: Minor
>  Labels: flink
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketer(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9511) Make StateDescriptor configurable with optional TTL

2018-07-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529023#comment-16529023
 ] 

vinoyang commented on FLINK-9511:
-

Hi [~azagrebin] the defined TtlConfig stored in flink-runtime module but 
StateDescriptor class exists in flink-core, it seems I can not import it. Is 
there something wrong?

> Make StateDescriptor configurable with optional TTL
> ---
>
> Key: FLINK-9511
> URL: https://issues.apache.org/jira/browse/FLINK-9511
> Project: Flink
>  Issue Type: Sub-task
>  Components: Java API, State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL can be enabled and configured in the constructor of abstract 
> StateDescriptor and become available in all subclasses:
> | {code:java}
> enum StateTtlUpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite }
> enum StateTtlCleanupGuarantee { Relaxed, Exact }
> enum TtlStateVisibility { Relaxed, Exact }
> class TtlConfig {
>   StateTtlUpdateType updateType;
>   StateTtlCleanupGuarantee cleanupStrategy;
>   TtlStateVisibility stateVisibility;
>   TimeCharacteristic timeCharacteristic;
>   long ttl;
> }
> // previous constructor
> StateDescriptor(...) {
>   this.ttlConfig = ttlConfig.DISABLED;
> }
> // overloaded constructor with TtlConfig
> StateDescriptor(..., ttlConfig) {
>   ...
> }
> {code}
>  |
> Another option is to consider adding StateDescriptor builder
> Queryable state can be unsupported with TTL for now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and display it in the UI

2018-07-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529025#comment-16529025
 ] 

vinoyang commented on FLINK-9682:
-

seems a feasible suggestion, what do you think about this? [~till.rohrmann] and 
[~StephanEwen]

> Add setDescription to execution environment and display it in the UI
> 
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6219: [hotfix] Fixed typo in docs

2018-07-01 Thread elbaulp
Github user elbaulp commented on the issue:

https://github.com/apache/flink/pull/6219
  
@tillrohrmann You're welcome :-)


---


[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hi @snuyanzin , thanks for your PR. The code looks good and the 
`deepToString()` function returns result correctly. I could not spot any issues 
with the implementation. To make the PR better, I think we can add a test in 
`CliUtilsTest` to test the `rowToString` function, since code in the function 
also has been changed. BTW, the PR template can be done better. See for PR 
https://github.com/apache/flink/pull/5811 as an example.
Best, Hequn


---


[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529096#comment-16529096
 ] 

ASF GitHub Bot commented on FLINK-9696:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hi @snuyanzin , thanks for your PR. The code looks good and the 
`deepToString()` function returns result correctly. I could not spot any issues 
with the implementation. To make the PR better, I think we can add a test in 
`CliUtilsTest` to test the `rowToString` function, since code in the function 
also has been changed. BTW, the PR template can be done better. See for PR 
https://github.com/apache/flink/pull/5811 as an example.
Best, Hequn


> Deep toString for arrays/map in SQL client 
> ---
>
> Key: FLINK-9696
> URL: https://issues.apache.org/jira/browse/FLINK-9696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently SQL client does not show arrays/map in human readable way (please 
> have a look at examples below). e.g. {code}select map[1,2];{code} is shown as 
> {noformat} +/-EXPR$0
>+ java.util.HashMap
> {noformat}
> {code}select array[1,2];{code} is shown as {noformat}
> +/-EXPR$0
>+   java.lang.Integer[]
> {noformat} 
> {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/-   
>  EXPR$0
>+   java.util.Map[]{noformat}
> {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/-
> EXPR$0
>+ java.util.HashMap{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-01 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199349619
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for better consistency. It is good to follow the Table-api style.


---


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529099#comment-16529099
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199349619
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for better consistency. It is good to follow the Table-api style.


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6214: [FLINK-9669] Add assignment store interface.

2018-07-01 Thread liurenjie1024
Github user liurenjie1024 closed the pull request at:

https://github.com/apache/flink/pull/6214


---


[GitHub] flink issue #6214: [FLINK-9669] Add assignment store interface.

2018-07-01 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/6214
  
@tillrohrmann This is from my initial design, and since the design has 
changed, we can close this now.


---


[jira] [Closed] (FLINK-9669) Introduce task manager assignment store

2018-07-01 Thread Renjie Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu closed FLINK-9669.
-
Resolution: Invalid

> Introduce task manager assignment store
> ---
>
> Key: FLINK-9669
> URL: https://issues.apache.org/jira/browse/FLINK-9669
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9669) Introduce task manager assignment store

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529103#comment-16529103
 ] 

ASF GitHub Bot commented on FLINK-9669:
---

Github user liurenjie1024 closed the pull request at:

https://github.com/apache/flink/pull/6214


> Introduce task manager assignment store
> ---
>
> Key: FLINK-9669
> URL: https://issues.apache.org/jira/browse/FLINK-9669
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9669) Introduce task manager assignment store

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529102#comment-16529102
 ] 

ASF GitHub Bot commented on FLINK-9669:
---

Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/6214
  
@tillrohrmann This is from my initial design, and since the design has 
changed, we can close this now.


> Introduce task manager assignment store
> ---
>
> Key: FLINK-9669
> URL: https://issues.apache.org/jira/browse/FLINK-9669
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9695) Add option for Mesos executor to forcefully pull Docker images

2018-07-01 Thread Leonid Ishimnikov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonid Ishimnikov updated FLINK-9695:
-
Description: It would be useful to have an option to forcefully pull Docker 
images for tasks, rather than reuse a previously cached image. Such option 
exists in many Mesos frameworks, and it significantly simplifies debugging. I 
propose adding a new 
{{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option.  
(was: It would be useful to have an option to forcefully pull Docker images for 
tasks, rather than reuse a previously cached image.  Such option exists in many 
Mesos frameworks, and it significantly simplifies debugging.  I propose adding 
a new {{mesos.resourcemanager.tasks.container.docker.forcePullImage}} option.)

> Add option for Mesos executor to forcefully pull Docker images
> --
>
> Key: FLINK-9695
> URL: https://issues.apache.org/jira/browse/FLINK-9695
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Leonid Ishimnikov
>Assignee: Leonid Ishimnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> It would be useful to have an option to forcefully pull Docker images for 
> tasks, rather than reuse a previously cached image. Such option exists in 
> many Mesos frameworks, and it significantly simplifies debugging. I propose 
> adding a new 
> {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...

2018-07-01 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6206
  
hi @zsolt-donca I have seen the Travis build error log, the failed reason 
is not because of your code. This PR looks good, but if you can add a test for 
`isAnonymousClass` method, that would be better.
cc @tillrohrmann 


---


[jira] [Commented] (FLINK-9654) Internal error while deserializing custom Scala TypeSerializer instances

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529121#comment-16529121
 ] 

ASF GitHub Bot commented on FLINK-9654:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6206
  
hi @zsolt-donca I have seen the Travis build error log, the failed reason 
is not because of your code. This PR looks good, but if you can add a test for 
`isAnonymousClass` method, that would be better.
cc @tillrohrmann 


> Internal error while deserializing custom Scala TypeSerializer instances
> 
>
> Key: FLINK-9654
> URL: https://issues.apache.org/jira/browse/FLINK-9654
> Project: Flink
>  Issue Type: Bug
>Reporter: Zsolt Donca
>Priority: Major
>  Labels: pull-request-available
>
> When you are using custom `TypeSerializer` instances implemented in Scala, 
> the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can 
> manifest itself when a Flink job is restored from checkpoint or started with 
> a savepoint.
> The reason is that in such a restore from checkpoint or savepoint, Flink uses 
> `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type 
> serializers and their configurations. The deserialization walks through the 
> entire object graph corresponding, and for each class it calls 
> `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place 
> for FLINK-6869). If there is an internal class defined in a Scala object for 
> which `getSimpleName` fails (see the Scala issue), then a 
> `java.lang.InternalError` is thrown which causes the task manager to restart. 
> In this case, Flink tries to restart the job on another task manager, causing 
> all the task managers to restart, wreaking havoc on the entire Flink cluster.
> There are some alternative type information derivation mechanisms that rely 
> on anonymous classes and, most importantly, classes generated by macros, that 
> can easily trigger the above problem. I am personally working on 
> [https://github.com/zsolt-donca/flink-alt], and there is also 
> [https://github.com/joroKr21/flink-shapeless]
> I prepared a pull request that fixes the issue. 
>  
> Edit: added a stack trace to help demonstrate the issue.
> 2018-06-21 13:08:07.829 [today-stats (2/2)] ERROR 
> org.apache.flink.runtime.taskmanager.Task  - Encountered fatal error 
> java.lang.InternalError - terminating the JVM
>  java.lang.InternalError: Malformed class name
>          at java.lang.Class.getSimpleName(Class.java:1330) ~[na:1.8.0_171]
>          at java.lang.Class.isAnonymousClass(Class.java:1411) ~[na:1.8.0_171]
>          at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:206)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1855) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) 
> ~[na:1.8.0_171]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>    

[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-01 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199352237
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for this approach that directly specifies the interval literals. 

Regarding Quarter. It seems like a very old implementation and we should 
probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it 
consistent with all other time unit extractions. What do you guys think?

I just tried it out by modifying the `Extract` method and it seems working 
perfectly.


---


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529122#comment-16529122
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199352237
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for this approach that directly specifies the interval literals. 

Regarding Quarter. It seems like a very old implementation and we should 
probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it 
consistent with all other time unit extractions. What do you guys think?

I just tried it out by modifying the `Extract` method and it seems working 
perfectly.


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

2018-07-01 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6194
  
+1, there is a conflicting file~ cc @sihuazhou


---


[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529123#comment-16529123
 ] 

ASF GitHub Bot commented on FLINK-9633:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6194
  
+1, there is a conflicting file~ cc @sihuazhou


> Flink doesn't use the Savepoint path's filesystem to create the OuptutStream 
> on Task.
> -
>
> Key: FLINK-9633
> URL: https://issues.apache.org/jira/browse/FLINK-9633
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, flink use the Savepoint's filesystem to create the meta output 
> stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
> the Checkpoint's filesystem to create the checkpoint data output stream. When 
> the Savepoint & Checkpoint in different filesystem this will lead to 
> problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...

2018-07-01 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6234

[FLINK-9431]Introduce time bounded condition to cep

## What is the purpose of the change

In cep the event is now driving the transformation of the NFA, I think the 
time factor should also be taken into account in some senior.

When a key's data is not endless, and if we want to match the following 
pattern after we match the `AB` after `B` has appeared for  ten seconds.

```
Pattern.begin("A").followedBy("B").notFollowedBy("C")
``` 
We can not emit the result because there is no branch can lead to the 
`Final State`, And i think we can add a `TimeEnd` state to describe a pattern 
that accepts a time condition evaluated by processing time / event time to 
compare the timestamp in the element we have meant before.

As described in the issue link,  there are two main reason why i introduce 
this feature

1.  the `notFollowedBy` cant be at the end of the pattern 
2.  the `within` just compare with the element at start, and some key's 
data may not endless, so we have to evaluate condition not also on event but 
also on time

## Brief change log

1.  Add the method to distinguish the event driven condition or time 
drivern condition in `IterativeCondition`
2.  when `advanceTime`, we not only prune the expire element, but also look 
the time bounded condition


## Verifying this change

This change is already covered by existing cep tests, may be it need a 
little more about the new api.

This change added tests and can be verified as follows:


## Documentation

  - Does this pull request introduce a new feature? (yes)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink timeEnd-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6234


commit b1aa992a97c8eac818e57c3d2f82be76957052d0
Author: minwenjun 
Date:   2018-07-01T14:41:44Z

[FLINK-9431]Introduce time bounded condition to cep




---


[jira] [Updated] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9431:
--
Labels: pull-request-available  (was: )

> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529124#comment-16529124
 ] 

ASF GitHub Bot commented on FLINK-9431:
---

GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6234

[FLINK-9431]Introduce time bounded condition to cep

## What is the purpose of the change

In cep the event is now driving the transformation of the NFA, I think the 
time factor should also be taken into account in some senior.

When a key's data is not endless, and if we want to match the following 
pattern after we match the `AB` after `B` has appeared for  ten seconds.

```
Pattern.begin("A").followedBy("B").notFollowedBy("C")
``` 
We can not emit the result because there is no branch can lead to the 
`Final State`, And i think we can add a `TimeEnd` state to describe a pattern 
that accepts a time condition evaluated by processing time / event time to 
compare the timestamp in the element we have meant before.

As described in the issue link,  there are two main reason why i introduce 
this feature

1.  the `notFollowedBy` cant be at the end of the pattern 
2.  the `within` just compare with the element at start, and some key's 
data may not endless, so we have to evaluate condition not also on event but 
also on time

## Brief change log

1.  Add the method to distinguish the event driven condition or time 
drivern condition in `IterativeCondition`
2.  when `advanceTime`, we not only prune the expire element, but also look 
the time bounded condition


## Verifying this change

This change is already covered by existing cep tests, may be it need a 
little more about the new api.

This change added tests and can be verified as follows:


## Documentation

  - Does this pull request introduce a new feature? (yes)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink timeEnd-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6234


commit b1aa992a97c8eac818e57c3d2f82be76957052d0
Author: minwenjun 
Date:   2018-07-01T14:41:44Z

[FLINK-9431]Introduce time bounded condition to cep




> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

2018-07-01 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6194
  
@yanghua Thanks for the review, I rebased the PR.


---


[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529134#comment-16529134
 ] 

ASF GitHub Bot commented on FLINK-9633:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6194
  
@yanghua Thanks for the review, I rebased the PR.


> Flink doesn't use the Savepoint path's filesystem to create the OuptutStream 
> on Task.
> -
>
> Key: FLINK-9633
> URL: https://issues.apache.org/jira/browse/FLINK-9633
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, flink use the Savepoint's filesystem to create the meta output 
> stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
> the Checkpoint's filesystem to create the checkpoint data output stream. When 
> the Savepoint & Checkpoint in different filesystem this will lead to 
> problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6132


---


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529253#comment-16529253
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6132


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-07-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9456.

Resolution: Fixed

Fixed via
1.6.0:
89cfeaa882f9e68df2bd215563622b48c29a9ec9
50c0ea8c9fe17278d45aba476a95791152a1420b

1.5.1:
a2f43b4cc081d360cd59ce3e7fb875e4b5fd243f
627412c4d2ea655271fe5da67a55ac936a1a060e

> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529264#comment-16529264
 ] 

ASF GitHub Bot commented on FLINK-9696:
---

Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199364538
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
--- End diff --

Is it a real case to have tuple here for SqlClient? API allows to do that 
but not sure about real cases.


> Deep toString for arrays/map in SQL client 
> ---
>
> Key: FLINK-9696
> URL: https://issues.apache.org/jira/browse/FLINK-9696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently SQL client does not show arrays/map in human readable way (please 
> have a look at examples below). e.g. {code}select map[1,2];{code} is shown as 
> {noformat} +/-EXPR$0
>+ java.util.HashMap
> {noformat}
> {code}select array[1,2];{code} is shown as {noformat}
> +/-EXPR$0
>+   java.lang.Integer[]
> {noformat} 
> {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/-   
>  EXPR$0
>+   java.util.Map[]{noformat}
> {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/-
> EXPR$0
>+ java.util.HashMap{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread snuyanzin
Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199364581
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
+   assertEquals(Arrays.toString(CliUtils.rowToString(result)),
+   "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 
2018-11-12, " +
+   "[1, 2], (1,123,null)]");
--- End diff --

If having tuple here is ok then the next strange thing is null handling 
inside tuples (it is printed in lowercase and without brackets). So there are 
at least 2 different types of null handling: inside tuples and all others.



---


[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529265#comment-16529265
 ] 

ASF GitHub Bot commented on FLINK-9696:
---

Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199364581
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
+   assertEquals(Arrays.toString(CliUtils.rowToString(result)),
+   "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 
2018-11-12, " +
+   "[1, 2], (1,123,null)]");
--- End diff --

If having tuple here is ok then the next strange thing is null handling 
inside tuples (it is printed in lowercase and without brackets). So there are 
at least 2 different types of null handling: inside tuples and all others.



> Deep toString for arrays/map in SQL client 
> ---
>
> Key: FLINK-9696
> URL: https://issues.apache.org/jira/browse/FLINK-9696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently SQL client does not show arrays/map in human readable way (please 
> have a look at examples below). e.g. {code}select map[1,2];{code} is shown as 
> {noformat} +/-EXPR$0
>+ java.util.HashMap
> {noformat}
> {code}select array[1,2];{code} is shown as {noformat}
> +/-EXPR$0
>+   java.lang.Integer[]
> {noformat} 
> {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/-   
>  EXPR$0
>+   java.util.Map[]{noformat}
> {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/-
> EXPR$0
>+ java.util.HashMap{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread snuyanzin
Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199364538
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
--- End diff --

Is it a real case to have tuple here for SqlClient? API allows to do that 
but not sure about real cases.


---


[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread snuyanzin
Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hello @hequn8128! Thank you for your review and comments.
About PR template - I did changes based on proposed #5811. Please let me 
know if it is acceptable or not.
About `rowToString` agree. I think it makes sense and I added such tests. 
However I faced with some strange behavior (I do not know if it is bug or 
whatever else). Commented on the code about that. 


---


[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529266#comment-16529266
 ] 

ASF GitHub Bot commented on FLINK-9696:
---

Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hello @hequn8128! Thank you for your review and comments.
About PR template - I did changes based on proposed #5811. Please let me 
know if it is acceptable or not.
About `rowToString` agree. I think it makes sense and I added such tests. 
However I faced with some strange behavior (I do not know if it is bug or 
whatever else). Commented on the code about that. 


> Deep toString for arrays/map in SQL client 
> ---
>
> Key: FLINK-9696
> URL: https://issues.apache.org/jira/browse/FLINK-9696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently SQL client does not show arrays/map in human readable way (please 
> have a look at examples below). e.g. {code}select map[1,2];{code} is shown as 
> {noformat} +/-EXPR$0
>+ java.util.HashMap
> {noformat}
> {code}select array[1,2];{code} is shown as {noformat}
> +/-EXPR$0
>+   java.lang.Integer[]
> {noformat} 
> {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/-   
>  EXPR$0
>+   java.util.Map[]{noformat}
> {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/-
> EXPR$0
>+ java.util.HashMap{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529276#comment-16529276
 ] 

ASF GitHub Bot commented on FLINK-9431:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6234#discussion_r199365531
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -224,23 +226,26 @@ private boolean isFinalState(ComputationState state) {
 
/**
 * Prunes states assuming there will be no events with timestamp 
lower than the given one.
-* It cleares the sharedBuffer and also emits all timed out partial 
matches.
+* It clears the sharedBuffer and also emits all timed out partial 
matches.
 *
 * @param sharedBuffer the SharedBuffer object that we need to work 
upon while processing
 * @param nfaState The NFAState object that we need to affect while 
processing
 * @param timestamptimestamp that indicates that there will be no 
more events with lower timestamp
 * @return all timed outed partial matches
 * @throws Exception Thrown if the system cannot access the state.
 */
-   public Collection>, Long>> advanceTime(
+   public Tuple2>, Long>>, 
Collection>>> advanceTime(
final SharedBuffer sharedBuffer,
final NFAState nfaState,
-   final long timestamp) throws Exception {
+   final long timestamp,
+   final AfterMatchSkipStrategy afterMatchSkipStrategy) 
throws Exception {
--- End diff --

please add parameter explication for new parameter `afterMatchSkipStrategy 
` of the method. 


> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...

2018-07-01 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6234#discussion_r199365531
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -224,23 +226,26 @@ private boolean isFinalState(ComputationState state) {
 
/**
 * Prunes states assuming there will be no events with timestamp 
lower than the given one.
-* It cleares the sharedBuffer and also emits all timed out partial 
matches.
+* It clears the sharedBuffer and also emits all timed out partial 
matches.
 *
 * @param sharedBuffer the SharedBuffer object that we need to work 
upon while processing
 * @param nfaState The NFAState object that we need to affect while 
processing
 * @param timestamptimestamp that indicates that there will be no 
more events with lower timestamp
 * @return all timed outed partial matches
 * @throws Exception Thrown if the system cannot access the state.
 */
-   public Collection>, Long>> advanceTime(
+   public Tuple2>, Long>>, 
Collection>>> advanceTime(
final SharedBuffer sharedBuffer,
final NFAState nfaState,
-   final long timestamp) throws Exception {
+   final long timestamp,
+   final AfterMatchSkipStrategy afterMatchSkipStrategy) 
throws Exception {
--- End diff --

please add parameter explication for new parameter `afterMatchSkipStrategy 
` of the method. 


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-01 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199365966
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -,6 +,53 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   math.atan(-0.123123132132132).toString)
   }
 
+  @Test
+  def testAtan2(): Unit = {
+testAllApis(
+  'f25.atan2('f26),
+  "f25.atan2(f26)",
+  "ATAN2(f25, f26)",
+  math.atan2(0.42.toByte, 0.toByte).toString)
+
+
--- End diff --

Remove this blank line would be better~ 


---


[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support

2018-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9688:
--
Labels: pull-request-available  (was: )

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529278#comment-16529278
 ] 

ASF GitHub Bot commented on FLINK-9688:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199365966
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -,6 +,53 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   math.atan(-0.123123132132132).toString)
   }
 
+  @Test
+  def testAtan2(): Unit = {
+testAllApis(
+  'f25.atan2('f26),
+  "f25.atan2(f26)",
+  "ATAN2(f25, f26)",
+  math.atan2(0.42.toByte, 0.toByte).toString)
+
+
--- End diff --

Remove this blank line would be better~ 


> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-01 Thread snuyanzin
Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199366673
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -,6 +,53 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   math.atan(-0.123123132132132).toString)
   }
 
+  @Test
+  def testAtan2(): Unit = {
+testAllApis(
+  'f25.atan2('f26),
+  "f25.atan2(f26)",
+  "ATAN2(f25, f26)",
+  math.atan2(0.42.toByte, 0.toByte).toString)
+
+
--- End diff --

Thank you for that catch. 
blank line removed


---


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529282#comment-16529282
 ] 

ASF GitHub Bot commented on FLINK-9688:
---

Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199366673
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -,6 +,53 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   math.atan(-0.123123132132132).toString)
   }
 
+  @Test
+  def testAtan2(): Unit = {
+testAllApis(
+  'f25.atan2('f26),
+  "f25.atan2(f26)",
+  "ATAN2(f25, f26)",
+  math.atan2(0.42.toByte, 0.toByte).toString)
+
+
--- End diff --

Thank you for that catch. 
blank line removed


> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

2018-07-01 Thread Clarkkkkk
Github user Clark closed the pull request at:

https://github.com/apache/flink/pull/6192


---


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529321#comment-16529321
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

Github user Clark closed the pull request at:

https://github.com/apache/flink/pull/6192


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9670) Introduce slot manager factory

2018-07-01 Thread Renjie Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu closed FLINK-9670.
-
Resolution: Invalid

> Introduce slot manager factory
> --
>
> Key: FLINK-9670
> URL: https://issues.apache.org/jira/browse/FLINK-9670
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
> Fix For: 1.5.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-07-01 Thread cricket007
Github user cricket007 commented on the issue:

https://github.com/apache/flink/pull/5995
  
What about implementing a `KeyedDeserializationSchema` for Avro?


---


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529329#comment-16529329
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user cricket007 commented on the issue:

https://github.com/apache/flink/pull/5995
  
What about implementing a `KeyedDeserializationSchema` for Avro?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9698) "case class must be static and globally accessible" is too constrained

2018-07-01 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-9698:
-

 Summary: "case class must be static and globally accessible" is 
too constrained
 Key: FLINK-9698
 URL: https://issues.apache.org/jira/browse/FLINK-9698
 Project: Flink
  Issue Type: Improvement
Reporter: Jeff Zhang


The following code can reproduce this issue. 
{code}
object BatchJob {
  def main(args: Array[String]) {
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tenv = TableEnvironment.getTableEnvironment(env)
case class Person(id:Int, name:String)
val ds = env.fromElements(Person(1,"jeff"), Person(2, "andy"))
tenv.registerDataSet("table_1", ds);
  }
}
{code}

Although I have workaround to declare case class outside of the main method, 
this workaround won't work in scala-shell. 




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9698) "case class must be static and globally accessible" is too constrained

2018-07-01 Thread Jeff Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529406#comment-16529406
 ] 

Jeff Zhang commented on FLINK-9698:
---

Could anyone let me know why flink require case class must be static and 
globally accessible ? Because this similar code can work in spark, so I beleive 
it should be the same for flink. As both of them require to serialize these 
class to remote host and execute in remote side. 

> "case class must be static and globally accessible" is too constrained
> --
>
> Key: FLINK-9698
> URL: https://issues.apache.org/jira/browse/FLINK-9698
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Priority: Major
>
> The following code can reproduce this issue. 
> {code}
> object BatchJob {
>   def main(args: Array[String]) {
> // set up the batch execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tenv = TableEnvironment.getTableEnvironment(env)
> case class Person(id:Int, name:String)
> val ds = env.fromElements(Person(1,"jeff"), Person(2, "andy"))
> tenv.registerDataSet("table_1", ds);
>   }
> }
> {code}
> Although I have workaround to declare case class outside of the main method, 
> this workaround won't work in scala-shell. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9609) Add bucket ready mechanism for BucketingSink when checkpoint complete

2018-07-01 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-9609:

Description: 
Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, 
users want to do some extra work when a bucket is ready. It would be nice if we 
can support {{BucketReady}} mechanism for users or we can tell users when a 
bucket is ready for use. For example, One bucket is created for every 5 
minutes, at the end of 5 minutes before creating the next bucket, the user 
might need to do something as the previous bucket ready, like sending the 
timestamp of the bucket ready time to a server or do some other stuff.

Here, Bucket ready means all the part files suffix name under a bucket neither 
{{.pending}} nor {{.in-progress}}. Then we can think this bucket is ready for 
user use. Like a watermark means no elements with a timestamp older or equal to 
the watermark timestamp should arrive at the window. We can also refer to the 
concept of watermark here, or we can call this *BucketWatermark* if we could.

Recently, I found a user who wants this functionality which I would think.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Let-BucketingSink-roll-file-on-each-checkpoint-td19034.html

Below is what he said:

My user case is we read data from message queue, write to HDFS, and our ETL 
team will use the data in HDFS. *In the case, ETL need to know if all data is 
ready to be read accurately*, so we use a counter to count how many data has 
been wrote, if the counter is equal to the number we received, we think HDFS 
file is ready. We send the counter message in a custom sink so ETL can know how 
many data has been wrote, but if use current BucketingSink, even through HDFS 
file is flushed, ETL may still cannot read the data. If we can close file 
during checkpoint, then the result is accurately. And for the HDFS small file 
problem, it can be controller by use bigger checkpoint interval. 

  was:
Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, 
users want to do some extra work when a bucket is ready. It would be nice if we 
can support {{BucketReady}} mechanism for users or we can tell users when a 
bucket is ready for use. For example, One bucket is created for every 5 
minutes, at the end of 5 minutes before creating the next bucket, the user 
might need to do something as the previous bucket ready, like sending the 
timestamp of the bucket ready time to a server or do some other stuff.

Here, Bucket ready means all the part files suffix name under a bucket neither 
{{.pending}} nor {{.in-progress}}. Then we can think this bucket is ready for 
user use. Like a watermark means no elements with a timestamp older or equal to 
the watermark timestamp should arrive at the window. We can also refer to the 
concept of watermark here, or we can call this *BucketWatermark* if we could.

Recently, I found a user who wants this functionality which I think.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Let-BucketingSink-roll-file-on-each-checkpoint-td19034.html

Below is what he said:

My user case is we read data from message queue, write to HDFS, and our ETL 
team will use the data in HDFS. *In the case, ETL need to know if all data is 
ready to be read accurately*, so we use a counter to count how many data has 
been wrote, if the counter is equal to the number we received, we think HDFS 
file is ready. We send the counter message in a custom sink so ETL can know how 
many data has been wrote, but if use current BucketingSink, even through HDFS 
file is flushed, ETL may still cannot read the data. If we can close file 
during checkpoint, then the result is accurately. And for the HDFS small file 
problem, it can be controller by use bigger checkpoint interval. 


> Add bucket ready mechanism for BucketingSink when checkpoint complete
> -
>
> Key: FLINK-9609
> URL: https://issues.apache.org/jira/browse/FLINK-9609
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector, Streaming Connectors
>Affects Versions: 1.5.0, 1.4.2
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>
> Currently, BucketingSink only support {{notifyCheckpointComplete}}. However, 
> users want to do some extra work when a bucket is ready. It would be nice if 
> we can support {{BucketReady}} mechanism for users or we can tell users when 
> a bucket is ready for use. For example, One bucket is created for every 5 
> minutes, at the end of 5 minutes before creating the next bucket, the user 
> might need to do something as the previous bucket ready, like sending the 
> timestamp of the bucket ready time to a server or do some other stuff.
> Here, Bucket r

[GitHub] flink pull request #6235: [FLINK-9377] [core] Remove serializers from checkp...

2018-07-01 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6235

[FLINK-9377] [core] Remove serializers from checkpointed state meta infos

## What is the purpose of the change

This PR is the first step towards a smoother state evolution experience.
It removes the behavior of writing serializers in checkpointed state meta 
infos (using Java serialization) and relying on them to be deserializable at 
restore time.
Instead, the configuration snapshots of serializers now double as a factory 
for creating the restore serializer, solidifying it as the single source of 
truth of information about the previous serializer of state.

With this change:
- Checkpoints / savepoints move towards being Java serialization-free
- The availability of the restore serializer, is basically determined at 
compile time
- Potentially resolves caveats with macro-generated Scala serializers which 
typically have anonymous classnames which are easily susceptible to changes, 
which blocks successful savepoint restores due to how Java serialization works.
- In conclusion: the written configuration snapshot is now the single point 
of entry for obtaining a serializer for previous state. The user is only 
required to guarantee that the configuration snapshot's classname remains 
constant for the restore to proceed.

This PR is only a WIP which only includes extending the 
`TypeSerializerConfigSnapshot` interface to include a `restoreSerializer` 
method, as well as the methods interplay in the state backends after removing 
serializers from checkpointed state meta infos.

This PR does **NOT** include:
- Proper implementation of the new `restoreSerializer` method on all 
serializers.
- Tests for snapshotting, restoring, and migrating serializers and their 
interplay in the state backends.

Because of this, it is expected that existing tests will fail.
Follow-up PRs will be opened for the above mentioned missing parts.

## Brief change log

- 5fc4a36 Add a `restoreSerializer` method to the 
`TypeSerializerConfigSnapshot` interface

The method still has a dummy base implementation, because this PR doesn't 
yet properly implement the method for all serializers. Once that is 
accomplished, the base implementation should be removed.

- 661eb6d Remove the "fallback" serializer option from `CompatibilityResult`

That option was available in the past to allow users to have a safety path 
for state conversion, in case their previous serializer cannot be deserialized 
due to any reason blocked by Java serialization. Since now we use the config 
snapshot as the restore serializer factory, it is guaranteed that the restore 
serializer is always available in case conversion is required, and therefore 
voids the need for the "fallback" serializer option.

- c91d045 Deprecates any utility methods that still have the behaviour of 
writing serializers in checkpoints

- e09f914 Introduces the `BackwardsCompatibleConfigSnapshot` class

The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config
snapshot which wraps an actual config snapshot, as well as a
pre-existing serializer instance.

In previous versions, since the config snapshot wasn't a serializer
factory but simply a container for serializer parameters, previous
serializers didn't necessarily have config snapshots that are capable of
correctly creating a correct corresponding restore serializer.

In this case, since previous serializers still have serializers written
in the checkpoint, the backwards compatible solution would be to wrap
the written serializer and the config snapshot within the
BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the
serializer, the wrapped serializer instance is returned instead of
actually calling the restoreSerializer method of the wrapped config
snapshot.

- da84665 the actual removal of serializers from checkpointed state meta 
info

## Verifying this change

This PR is a WIP preview, and tests is expected to fail due to reasons 
mentioned in the description.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  

[jira] [Updated] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9377:
--
Labels: pull-request-available  (was: )

> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529428#comment-16529428
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6235

[FLINK-9377] [core] Remove serializers from checkpointed state meta infos

## What is the purpose of the change

This PR is the first step towards a smoother state evolution experience.
It removes the behavior of writing serializers in checkpointed state meta 
infos (using Java serialization) and relying on them to be deserializable at 
restore time.
Instead, the configuration snapshots of serializers now double as a factory 
for creating the restore serializer, solidifying it as the single source of 
truth of information about the previous serializer of state.

With this change:
- Checkpoints / savepoints move towards being Java serialization-free
- The availability of the restore serializer, is basically determined at 
compile time
- Potentially resolves caveats with macro-generated Scala serializers which 
typically have anonymous classnames which are easily susceptible to changes, 
which blocks successful savepoint restores due to how Java serialization works.
- In conclusion: the written configuration snapshot is now the single point 
of entry for obtaining a serializer for previous state. The user is only 
required to guarantee that the configuration snapshot's classname remains 
constant for the restore to proceed.

This PR is only a WIP which only includes extending the 
`TypeSerializerConfigSnapshot` interface to include a `restoreSerializer` 
method, as well as the methods interplay in the state backends after removing 
serializers from checkpointed state meta infos.

This PR does **NOT** include:
- Proper implementation of the new `restoreSerializer` method on all 
serializers.
- Tests for snapshotting, restoring, and migrating serializers and their 
interplay in the state backends.

Because of this, it is expected that existing tests will fail.
Follow-up PRs will be opened for the above mentioned missing parts.

## Brief change log

- 5fc4a36 Add a `restoreSerializer` method to the 
`TypeSerializerConfigSnapshot` interface

The method still has a dummy base implementation, because this PR doesn't 
yet properly implement the method for all serializers. Once that is 
accomplished, the base implementation should be removed.

- 661eb6d Remove the "fallback" serializer option from `CompatibilityResult`

That option was available in the past to allow users to have a safety path 
for state conversion, in case their previous serializer cannot be deserialized 
due to any reason blocked by Java serialization. Since now we use the config 
snapshot as the restore serializer factory, it is guaranteed that the restore 
serializer is always available in case conversion is required, and therefore 
voids the need for the "fallback" serializer option.

- c91d045 Deprecates any utility methods that still have the behaviour of 
writing serializers in checkpoints

- e09f914 Introduces the `BackwardsCompatibleConfigSnapshot` class

The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config
snapshot which wraps an actual config snapshot, as well as a
pre-existing serializer instance.

In previous versions, since the config snapshot wasn't a serializer
factory but simply a container for serializer parameters, previous
serializers didn't necessarily have config snapshots that are capable of
correctly creating a correct corresponding restore serializer.

In this case, since previous serializers still have serializers written
in the checkpoint, the backwards compatible solution would be to wrap
the written serializer and the config snapshot within the
BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the
serializer, the wrapped serializer instance is returned instead of
actually calling the restoreSerializer method of the wrapped config
snapshot.

- da84665 the actual removal of serializers from checkpointed state meta 
info

## Verifying this change

This PR is a WIP preview, and tests is expected to fail due to reasons 
mentioned in the description.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and i