[jira] [Assigned] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint

2017-10-29 Thread Fang Yong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong reassigned FLINK-7941:


Assignee: Fang Yong

> Port SubtasksTimesHandler to new REST endpoint
> --
>
> Key: FLINK-7941
> URL: https://issues.apache.org/jira/browse/FLINK-7941
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port *SubtasksTimesHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint

2017-10-29 Thread Fang Yong (JIRA)
Fang Yong created FLINK-7941:


 Summary: Port SubtasksTimesHandler to new REST endpoint
 Key: FLINK-7941
 URL: https://issues.apache.org/jira/browse/FLINK-7941
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination, REST, Webfrontend
Reporter: Fang Yong


Port *SubtasksTimesHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer

2017-10-29 Thread aitozi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi reassigned FLINK-6109:
-

Assignee: aitozi

> Add "consumer lag" report metric to FlinkKafkaConsumer
> --
>
> Key: FLINK-6109
> URL: https://issues.apache.org/jira/browse/FLINK-6109
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: aitozi
>
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer

2017-10-29 Thread aitozi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi reassigned FLINK-6109:
-

Assignee: aitozi

> Add "consumer lag" report metric to FlinkKafkaConsumer
> --
>
> Key: FLINK-6109
> URL: https://issues.apache.org/jira/browse/FLINK-6109
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: aitozi
>
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer

2017-10-29 Thread aitozi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi reassigned FLINK-6109:
-

Assignee: (was: aitozi)

> Add "consumer lag" report metric to FlinkKafkaConsumer
> --
>
> Key: FLINK-6109
> URL: https://issues.apache.org/jira/browse/FLINK-6109
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer

2017-10-29 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224395#comment-16224395
 ] 

aitozi commented on FLINK-6109:
---

Is anyone working on this issue, Now in version 1.3.2 we can only see that the 
`records-lag-max` metric report by kafka consumer, but no detail lag per 
partition.  If No one do this, can i work on this issue?

> Add "consumer lag" report metric to FlinkKafkaConsumer
> --
>
> Key: FLINK-6109
> URL: https://issues.apache.org/jira/browse/FLINK-6109
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7940) Add timeout for futures

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224206#comment-16224206
 ] 

ASF GitHub Bot commented on FLINK-7940:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
+
+   private final CompletableFuture future;
+
+   Timeout(CompletableFuture future) {
+   this.future = Preconditions.checkNotNull(future);
+   }
+
+   @Override
+   public void run() {
+   future.completeExceptionally(new TimeoutException());
--- End diff --

Could be useful to have to used timeout in the exception message.


> Add timeout for futures
> ---
>
> Key: FLINK-7940
> URL: https://issues.apache.org/jira/browse/FLINK-7940
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> In order to conveniently timeout futures, we should add tooling to 
> {{FutureUtils}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7940) Add timeout for futures

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224208#comment-16224208
 ] 

ASF GitHub Bot commented on FLINK-7940:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
--- End diff --

make private?


> Add timeout for futures
> ---
>
> Key: FLINK-7940
> URL: https://issues.apache.org/jira/browse/FLINK-7940
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> In order to conveniently timeout futures, we should add tooling to 
> {{FutureUtils}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7940) Add timeout for futures

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224207#comment-16224207
 ] 

ASF GitHub Bot commented on FLINK-7940:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594412
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
+
+   private final CompletableFuture future;
+
+   Timeout(CompletableFuture future) {
+   this.future = Preconditions.checkNotNull(future);
+   }
+
+   @Override
+   public void run() {
+   future.completeExceptionally(new TimeoutException());
+   }
+   }
+
+   /**
+* Delay scheduler used to timeout futures.
+*
+* This class creates a singleton scheduler used to run the provided 
actions.
+*/
+   private static final class Delayer {
+   static final ScheduledThreadPoolExecutor delayer = new 
ScheduledThreadPoolExecutor(
+   1,
+   new 
ExecutorThreadFactory("CompletableFutureDelayScheduler"));
--- End diff --

Let's add a "Flink" prefix to the thread name.


> Add timeout for futures
> ---
>
> Key: FLINK-7940
> URL: https://issues.apache.org/jira/browse/FLINK-7940
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> In order to conveniently timeout futures, we should add tooling to 
> {{FutureUtils}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
--- End diff --

make private?


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
+
+   private final CompletableFuture future;
+
+   Timeout(CompletableFuture future) {
+   this.future = Preconditions.checkNotNull(future);
+   }
+
+   @Override
+   public void run() {
+   future.completeExceptionally(new TimeoutException());
--- End diff --

Could be useful to have to used timeout in the exception message.


---


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4918#discussion_r147594412
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) 
throws Throwable {
return result;
}
 
-   // 

-   //  Future Completed with an exception.
-   // 

+   /**
+* Times the given future out after the timeout.
+*
+* @param future to time out
+* @param timeout after which the given future is timed out
+* @param timeUnit time unit of the timeout
+* @param  type of the given future
+* @return The timeout enriched future
+*/
+   public static  CompletableFuture orTimeout(CompletableFuture 
future, long timeout, TimeUnit timeUnit) {
+   final ScheduledFuture timeoutFuture = Delayer.delay(new 
Timeout(future), timeout, timeUnit);
+
+   future.whenComplete((T value, Throwable throwable) -> {
+   if (!timeoutFuture.isDone()) {
+   timeoutFuture.cancel(false);
+   }
+   });
+
+   return future;
+   }
 
/**
-* Returns a {@link CompletableFuture} that has failed with the 
exception
-* provided as argument.
-* @param throwable the exception to fail the future with.
-* @return The failed future.
+* Runnable to complete the given future with a {@link 
TimeoutException}.
 */
-   public static  CompletableFuture getFailedFuture(Throwable 
throwable) {
-   CompletableFuture failedAttempt = new CompletableFuture<>();
-   failedAttempt.completeExceptionally(throwable);
-   return failedAttempt;
+   static final class Timeout implements Runnable {
+
+   private final CompletableFuture future;
+
+   Timeout(CompletableFuture future) {
+   this.future = Preconditions.checkNotNull(future);
+   }
+
+   @Override
+   public void run() {
+   future.completeExceptionally(new TimeoutException());
+   }
+   }
+
+   /**
+* Delay scheduler used to timeout futures.
+*
+* This class creates a singleton scheduler used to run the provided 
actions.
+*/
+   private static final class Delayer {
+   static final ScheduledThreadPoolExecutor delayer = new 
ScheduledThreadPoolExecutor(
+   1,
+   new 
ExecutorThreadFactory("CompletableFutureDelayScheduler"));
--- End diff --

Let's add a "Flink" prefix to the thread name.


---


[jira] [Comment Edited] (FLINK-7049) TestingApplicationMaster keeps running after integration tests finish

2017-10-29 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069086#comment-16069086
 ] 

Ted Yu edited comment on FLINK-7049 at 10/29/17 7:14 PM:
-

Stack trace for TestingApplicationMaster.


was (Author: yuzhih...@gmail.com):
Stack trace for TestingApplicationMaster .

> TestingApplicationMaster keeps running after integration tests finish
> -
>
> Key: FLINK-7049
> URL: https://issues.apache.org/jira/browse/FLINK-7049
> Project: Flink
>  Issue Type: Test
>  Components: Tests, YARN
>Reporter: Ted Yu
>Priority: Minor
> Attachments: testingApplicationMaster.stack
>
>
> After integration tests finish, TestingApplicationMaster is still running.
> Toward the end of 
> flink-yarn-tests/target/flink-yarn-tests-ha/flink-yarn-tests-ha-logDir-nm-1_0/application_1498768839874_0001/container_1498768839874_0001_03_01/jobmanager.log
>  :
> {code}
> 2017-06-29 22:09:49,681 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server 127.0.0.1/127.0.0.1:46165
> 2017-06-29 22:09:49,681 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed
> 2017-06-29 22:09:49,682 WARN  org.apache.zookeeper.ClientCnxn 
>   - Session 0x0 for server null, unexpected error, closing socket 
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>   at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> 2017-06-29 22:09:50,782 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3597644653611245612.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-06-29 22:09:50,782 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server 127.0.0.1/127.0.0.1:46165
> 2017-06-29 22:09:50,782 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed
> 2017-06-29 22:09:50,783 WARN  org.apache.zookeeper.ClientCnxn 
>   - Session 0x0 for server null, unexpected error, closing socket 
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>   at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1

2017-10-29 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7642:
--
Description: 
Surefire 2.19 release introduced more useful test filters which would let us 
run a subset of the test.


This issue is for upgrading maven surefire plugin to 2.19.1

  was:
Surefire 2.19 release introduced more useful test filters which would let us 
run a subset of the test.

This issue is for upgrading maven surefire plugin to 2.19.1


> Upgrade maven surefire plugin to 2.19.1
> ---
>
> Key: FLINK-7642
> URL: https://issues.apache.org/jira/browse/FLINK-7642
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> Surefire 2.19 release introduced more useful test filters which would let us 
> run a subset of the test.
> This issue is for upgrading maven surefire plugin to 2.19.1



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2017-10-29 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7795:
--
Description: 
http://errorprone.info/ is a tool which detects common coding mistakes.

We should incorporate into Flink build process.

  was:
http://errorprone.info/ is a tool which detects common coding mistakes.

We should incorporate into Flink build.


> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7940) Add timeout for futures

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224046#comment-16224046
 ] 

ASF GitHub Bot commented on FLINK-7940:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4918

[FLINK-7940] Add FutureUtils.orTimeout

## What is the purpose of the change

This commit adds a convenience function `FutureUtils#orTimeout` which 
allows to easily add a timeout to
a CompletableFuture.

## Verifying this change

- Added `FutureUtilsTest#testOrTimeout`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink orTimeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4918.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4918


commit 83e52f5458379855c481fe169f6be50a8afee336
Author: Till Rohrmann 
Date:   2017-10-29T15:01:18Z

[hotfix] Remove redundant FutureUtils#getFailedFuture

FutureUtils#completedExceptionally does exactly the same.

commit 94b3d14bc2ba64a4862bd83e670f3dbfccdf96b8
Author: Till Rohrmann 
Date:   2017-10-29T15:38:53Z

[FLINK-7940] Add FutureUtils.orTimeout

This commit adds a convenience function which allows to easily add a 
timeout to
a CompletableFuture.




> Add timeout for futures
> ---
>
> Key: FLINK-7940
> URL: https://issues.apache.org/jira/browse/FLINK-7940
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> In order to conveniently timeout futures, we should add tooling to 
> {{FutureUtils}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

2017-10-29 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4918

[FLINK-7940] Add FutureUtils.orTimeout

## What is the purpose of the change

This commit adds a convenience function `FutureUtils#orTimeout` which 
allows to easily add a timeout to
a CompletableFuture.

## Verifying this change

- Added `FutureUtilsTest#testOrTimeout`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink orTimeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4918.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4918


commit 83e52f5458379855c481fe169f6be50a8afee336
Author: Till Rohrmann 
Date:   2017-10-29T15:01:18Z

[hotfix] Remove redundant FutureUtils#getFailedFuture

FutureUtils#completedExceptionally does exactly the same.

commit 94b3d14bc2ba64a4862bd83e670f3dbfccdf96b8
Author: Till Rohrmann 
Date:   2017-10-29T15:38:53Z

[FLINK-7940] Add FutureUtils.orTimeout

This commit adds a convenience function which allows to easily add a 
timeout to
a CompletableFuture.




---


[jira] [Created] (FLINK-7940) Add timeout for futures

2017-10-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7940:


 Summary: Add timeout for futures
 Key: FLINK-7940
 URL: https://issues.apache.org/jira/browse/FLINK-7940
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


In order to conveniently timeout futures, we should add tooling to 
{{FutureUtils}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224043#comment-16224043
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4909


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4909: [FLINK-7880][QS] Fix QS test instabilities.

2017-10-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4909


---


[jira] [Closed] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-10-29 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-7880.
-
Resolution: Fixed

Merged on master at 6b8f7dc2d818cbe87bdfbe8852cfec5507f77a5a

> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224027#comment-16224027
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147574828
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 ---
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.sql.Timestamp
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+import scala.collection.JavaConverters._
+
+/** Util class for [[TableSource]]. */
+object TableSourceUtil {
+
+  /** Returns true if the [[TableSource]] has a rowtime attribute. */
+  def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean =
+getRowtimeAttributes(tableSource).nonEmpty
+
+  /** Returns true if the [[TableSource]] has a proctime attribute. */
+  def hasProctimeAttribute(tableSource: TableSource[_]): Boolean =
+getProctimeAttributes(tableSource).nonEmpty
+
+  /**
+* Validates a TableSource.
+*
+* - checks that all fields of the schema can be resolved
+* - checks that resolved fields have the correct type
+* - checks that the time attributes are correctly configured.
+*
+* @param tableSource The [[TableSource]] for which the time attributes 
are checked.
+*/
+  def validateTableSource(tableSource: TableSource[_]): Unit = {
+
+val schema = tableSource.getTableSchema
+val tableFieldNames = schema.getColumnNames
+val tableFieldTypes = schema.getTypes
+
+// get rowtime and proctime attributes
+val rowtimeAttributes = getRowtimeAttributes(tableSource)
+val proctimeAttributes = getProctimeAttributes(tableSource)
+
+// validate that schema fields can be resolved to a return type field 
of correct type
+var mappedFieldCnt = 0
+tableFieldTypes.zip(tableFieldNames).foreach {
+  case (t: SqlTimeTypeInfo[_], name: String)
+if t.getTypeClass == classOf[Timestamp] && 
proctimeAttributes.contains(name) =>
+// OK, field was mapped to proctime attribute
+  case (t: SqlTimeTypeInfo[_], name: String)
+if t.getTypeClass == classOf[Timestamp] && 
rowtimeAttributes.contains(name) =>
+// OK, field was mapped to rowtime attribute
+  case (t: TypeInformation[_], name) =>
+// check if field is registered as time indicator
+if (getProctimeAttributes(tableSource).contains(name)) {
+  throw new ValidationException(s"Processing time field '$name' 
has invalid type $t. " +
+s"Processing time attributes must be of type 
${Types.SQL_TIMESTAMP}.")
+}
+if (getRowtimeAttributes(tableSource).contains(name)) {
+  throw new ValidationException(s"Rowtime field '$name' has 
invalid type $t. " +
+s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
+}
+// check that field can be resolved in input type
+val (physicalName, _, tpe) = 

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224025#comment-16224025
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147582469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
 ---
@@ -38,23 +38,26 @@ import scala.collection.mutable
   * @param path The path to the CSV file.
   * @param fieldNames The names of the table fields.
   * @param fieldTypes The types of the table fields.
+  * @param selectedFields The fields which will be read and returned by 
the table source.
+  *   If None, all fields are returned.
   * @param fieldDelim The field delimiter, "," by default.
   * @param rowDelim The row delimiter, "\n" by default.
   * @param quoteCharacter An optional quote character for String values, 
null by default.
   * @param ignoreFirstLine Flag to ignore the first line, false by default.
   * @param ignoreComments An optional prefix to indicate comments, null by 
default.
   * @param lenient Flag to skip records with parse error instead to fail, 
false by default.
   */
-class CsvTableSource(
+class CsvTableSource private (
 private val path: String,
 private val fieldNames: Array[String],
 private val fieldTypes: Array[TypeInformation[_]],
-private val fieldDelim: String = 
CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-private val quoteCharacter: Character = null,
-private val ignoreFirstLine: Boolean = false,
-private val ignoreComments: String = null,
-private val lenient: Boolean = false)
+private val selectedFields: Array[Int],
+private val fieldDelim: String,
+private val rowDelim: String,
+private val quoteCharacter: Character,
+private val ignoreFirstLine: Boolean,
+private val ignoreComments: String,
+private val lenient: Boolean)
   extends BatchTableSource[Row]
--- End diff --

Maybe we need a base class instead of traits to do something like checking 
the equality of numbers of field names/types.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224024#comment-16224024
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147583120
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.{Expression, 
ResolvedFieldReference}
+
+/**
+  * The [[FieldComputer]] interface returns an expression to compute the 
field of the table schema
+  * of a [[TableSource]] from one or more fields of the [[TableSource]]'s 
return type.
+  *
+  * @tparam T The result type of the provided expression.
+  */
+abstract class FieldComputer[T] {
--- End diff --

I think we could add a `getArguments: Array[_]` function to allow providing 
extra arguments besides the existing fields, e.g., for situations in which 
event-time is represented as `String`, the argument should be the time pattern.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224026#comment-16224026
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147580191
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -56,33 +63,123 @@ class StreamTableSourceScan(
   cluster,
   traitSet,
   getTable,
-  tableSource
+  tableSource,
+  selectedFields
 )
   }
 
   override def copy(
   traitSet: RelTraitSet,
-  newTableSource: TableSource[_])
-: PhysicalTableSourceScan = {
+  newTableSource: TableSource[_]): PhysicalTableSourceScan = {
 
 new StreamTableSourceScan(
   cluster,
   traitSet,
   getTable,
-  newTableSource.asInstanceOf[StreamTableSource[_]]
+  newTableSource.asInstanceOf[StreamTableSource[_]],
+  selectedFields
 )
   }
 
   override def translateToPlan(
   tableEnv: StreamTableEnvironment,
   queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
+val fieldIndexes = TableSourceUtil.computeIndexMapping(
+  tableSource,
+  isStreamTable = true,
+  selectedFields)
+
 val config = tableEnv.getConfig
 val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-convertToInternalRow(
-  new RowSchema(getRowType),
+val outputSchema = new RowSchema(this.getRowType)
+
+// check that declared and actual type of table source DataStream are 
identical
+if (inputDataStream.getType != tableSource.getReturnType) {
+  throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
+s"returned a DataStream of type ${inputDataStream.getType} that 
does not match with the " +
+s"type ${tableSource.getReturnType} declared by the 
TableSource.getReturnType() method. " +
+s"Please validate the implementation of the TableSource.")
+}
+
+// get expression to extract rowtime attribute
+val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+  tableSource,
+  selectedFields,
+  cluster,
+  tableEnv.getRelBuilder,
+  TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+)
+
+// ingest table and convert and extract time attributes if necessary
+val ingestedTable = convertToInternalRow(
+  outputSchema,
   inputDataStream,
-  new StreamTableSourceTable(tableSource),
-  config)
+  fieldIndexes,
+  config,
+  rowtimeExpression)
+
+// generate watermarks for rowtime indicator
+val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+  TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
selectedFields)
+
+val withWatermarks = if (rowtimeDesc.isDefined) {
+  val rowtimeFieldIdx = 
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+  val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+  watermarkStrategy match {
+case p: PeriodicWatermarkAssigner =>
+  val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+  ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+case p: PunctuatedWatermarkAssigner =>
+  val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+  ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+  }
+} else {
+  // No need to generate watermarks if no rowtime attribute is 
specified.
+  ingestedTable
+}
+
+withWatermarks
+  }
+}
+
+/**
+  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+  *
+  * @param timeFieldIdx the index of the rowtime attribute.
+  * @param assigner the watermark assigner.
+  */
+private class PeriodicWatermarkAssignerWrapper(
+timeFieldIdx: Int,
+assigner: PeriodicWatermarkAssigner)
+  extends AssignerWithPeriodicWatermarks[CRow] {
+
+  override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+  override def extractTimestamp(crow: CRow, previousElementTimestamp: 
Long): Long = {
+val timestamp: Long = 
crow.row.getField(timeFieldIdx).asInstanceOf[Long]
+assigner.nextTimestamp(timestamp)
+0L
--- End diff --

I know the timestamp in the `StreamRecord` is useless for the Table/SQL API 

[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147583120
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.{Expression, 
ResolvedFieldReference}
+
+/**
+  * The [[FieldComputer]] interface returns an expression to compute the 
field of the table schema
+  * of a [[TableSource]] from one or more fields of the [[TableSource]]'s 
return type.
+  *
+  * @tparam T The result type of the provided expression.
+  */
+abstract class FieldComputer[T] {
--- End diff --

I think we could add a `getArguments: Array[_]` function to allow providing 
extra arguments besides the existing fields, e.g., for situations in which 
event-time is represented as `String`, the argument should be the time pattern.


---


[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147574828
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 ---
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.sql.Timestamp
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+import scala.collection.JavaConverters._
+
+/** Util class for [[TableSource]]. */
+object TableSourceUtil {
+
+  /** Returns true if the [[TableSource]] has a rowtime attribute. */
+  def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean =
+getRowtimeAttributes(tableSource).nonEmpty
+
+  /** Returns true if the [[TableSource]] has a proctime attribute. */
+  def hasProctimeAttribute(tableSource: TableSource[_]): Boolean =
+getProctimeAttributes(tableSource).nonEmpty
+
+  /**
+* Validates a TableSource.
+*
+* - checks that all fields of the schema can be resolved
+* - checks that resolved fields have the correct type
+* - checks that the time attributes are correctly configured.
+*
+* @param tableSource The [[TableSource]] for which the time attributes 
are checked.
+*/
+  def validateTableSource(tableSource: TableSource[_]): Unit = {
+
+val schema = tableSource.getTableSchema
+val tableFieldNames = schema.getColumnNames
+val tableFieldTypes = schema.getTypes
+
+// get rowtime and proctime attributes
+val rowtimeAttributes = getRowtimeAttributes(tableSource)
+val proctimeAttributes = getProctimeAttributes(tableSource)
+
+// validate that schema fields can be resolved to a return type field 
of correct type
+var mappedFieldCnt = 0
+tableFieldTypes.zip(tableFieldNames).foreach {
+  case (t: SqlTimeTypeInfo[_], name: String)
+if t.getTypeClass == classOf[Timestamp] && 
proctimeAttributes.contains(name) =>
+// OK, field was mapped to proctime attribute
+  case (t: SqlTimeTypeInfo[_], name: String)
+if t.getTypeClass == classOf[Timestamp] && 
rowtimeAttributes.contains(name) =>
+// OK, field was mapped to rowtime attribute
+  case (t: TypeInformation[_], name) =>
+// check if field is registered as time indicator
+if (getProctimeAttributes(tableSource).contains(name)) {
+  throw new ValidationException(s"Processing time field '$name' 
has invalid type $t. " +
+s"Processing time attributes must be of type 
${Types.SQL_TIMESTAMP}.")
+}
+if (getRowtimeAttributes(tableSource).contains(name)) {
+  throw new ValidationException(s"Rowtime field '$name' has 
invalid type $t. " +
+s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
+}
+// check that field can be resolved in input type
+val (physicalName, _, tpe) = resolveInputField(name, tableSource)
+// validate that mapped fields are are same type
+if (tpe != t) {
+  throw ValidationException(s"Type $t of table field '$name' does 
not " +
+

[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147580191
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -56,33 +63,123 @@ class StreamTableSourceScan(
   cluster,
   traitSet,
   getTable,
-  tableSource
+  tableSource,
+  selectedFields
 )
   }
 
   override def copy(
   traitSet: RelTraitSet,
-  newTableSource: TableSource[_])
-: PhysicalTableSourceScan = {
+  newTableSource: TableSource[_]): PhysicalTableSourceScan = {
 
 new StreamTableSourceScan(
   cluster,
   traitSet,
   getTable,
-  newTableSource.asInstanceOf[StreamTableSource[_]]
+  newTableSource.asInstanceOf[StreamTableSource[_]],
+  selectedFields
 )
   }
 
   override def translateToPlan(
   tableEnv: StreamTableEnvironment,
   queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
+val fieldIndexes = TableSourceUtil.computeIndexMapping(
+  tableSource,
+  isStreamTable = true,
+  selectedFields)
+
 val config = tableEnv.getConfig
 val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-convertToInternalRow(
-  new RowSchema(getRowType),
+val outputSchema = new RowSchema(this.getRowType)
+
+// check that declared and actual type of table source DataStream are 
identical
+if (inputDataStream.getType != tableSource.getReturnType) {
+  throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
+s"returned a DataStream of type ${inputDataStream.getType} that 
does not match with the " +
+s"type ${tableSource.getReturnType} declared by the 
TableSource.getReturnType() method. " +
+s"Please validate the implementation of the TableSource.")
+}
+
+// get expression to extract rowtime attribute
+val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+  tableSource,
+  selectedFields,
+  cluster,
+  tableEnv.getRelBuilder,
+  TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+)
+
+// ingest table and convert and extract time attributes if necessary
+val ingestedTable = convertToInternalRow(
+  outputSchema,
   inputDataStream,
-  new StreamTableSourceTable(tableSource),
-  config)
+  fieldIndexes,
+  config,
+  rowtimeExpression)
+
+// generate watermarks for rowtime indicator
+val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+  TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
selectedFields)
+
+val withWatermarks = if (rowtimeDesc.isDefined) {
+  val rowtimeFieldIdx = 
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+  val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+  watermarkStrategy match {
+case p: PeriodicWatermarkAssigner =>
+  val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+  ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+case p: PunctuatedWatermarkAssigner =>
+  val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+  ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+  }
+} else {
+  // No need to generate watermarks if no rowtime attribute is 
specified.
+  ingestedTable
+}
+
+withWatermarks
+  }
+}
+
+/**
+  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+  *
+  * @param timeFieldIdx the index of the rowtime attribute.
+  * @param assigner the watermark assigner.
+  */
+private class PeriodicWatermarkAssignerWrapper(
+timeFieldIdx: Int,
+assigner: PeriodicWatermarkAssigner)
+  extends AssignerWithPeriodicWatermarks[CRow] {
+
+  override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+  override def extractTimestamp(crow: CRow, previousElementTimestamp: 
Long): Long = {
+val timestamp: Long = 
crow.row.getField(timeFieldIdx).asInstanceOf[Long]
+assigner.nextTimestamp(timestamp)
+0L
--- End diff --

I know the timestamp in the `StreamRecord` is useless for the Table/SQL API 
now, but the returned value will still be set and `StreamRecord.hasTimestamp()` 
will be `true`. How about return a negative value (e.g., -1) here, so that 
maybe it's possible for us to erase the time from `StreamRecord` when the 
extracted 

[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...

2017-10-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147582469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
 ---
@@ -38,23 +38,26 @@ import scala.collection.mutable
   * @param path The path to the CSV file.
   * @param fieldNames The names of the table fields.
   * @param fieldTypes The types of the table fields.
+  * @param selectedFields The fields which will be read and returned by 
the table source.
+  *   If None, all fields are returned.
   * @param fieldDelim The field delimiter, "," by default.
   * @param rowDelim The row delimiter, "\n" by default.
   * @param quoteCharacter An optional quote character for String values, 
null by default.
   * @param ignoreFirstLine Flag to ignore the first line, false by default.
   * @param ignoreComments An optional prefix to indicate comments, null by 
default.
   * @param lenient Flag to skip records with parse error instead to fail, 
false by default.
   */
-class CsvTableSource(
+class CsvTableSource private (
 private val path: String,
 private val fieldNames: Array[String],
 private val fieldTypes: Array[TypeInformation[_]],
-private val fieldDelim: String = 
CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-private val quoteCharacter: Character = null,
-private val ignoreFirstLine: Boolean = false,
-private val ignoreComments: String = null,
-private val lenient: Boolean = false)
+private val selectedFields: Array[Int],
+private val fieldDelim: String,
+private val rowDelim: String,
+private val quoteCharacter: Character,
+private val ignoreFirstLine: Boolean,
+private val ignoreComments: String,
+private val lenient: Boolean)
   extends BatchTableSource[Row]
--- End diff --

Maybe we need a base class instead of traits to do something like checking 
the equality of numbers of field names/types.


---


[GitHub] flink issue #4751: [FLINK-7739][kafka-tests] Throttle down data producing th...

2017-10-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4751
  
I assume that the `Thread.sleep(1)` should not do much harm if the Kafka 
Producer already blocks due to buffering and I/O. If it should not block, then 
the `Thread.sleep(1)` will give other threads the chance to execute when having 
only scarce resources (e.g. on Travis).

I agree with @pnowojski that this is not a stress test and we have seen in 
the past that tests became unstable due to resource scarcity. Therefore I would 
like to merge this PR if Travis gives green light and @StephanEwen does not 
object.


---


[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224019#comment-16224019
 ] 

ASF GitHub Bot commented on FLINK-7739:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4751
  
I assume that the `Thread.sleep(1)` should not do much harm if the Kafka 
Producer already blocks due to buffering and I/O. If it should not block, then 
the `Thread.sleep(1)` will give other threads the chance to execute when having 
only scarce resources (e.g. on Travis).

I agree with @pnowojski that this is not a stress test and we have seen in 
the past that tests became unstable due to resource scarcity. Therefore I would 
like to merge this PR if Travis gives green light and @StephanEwen does not 
object.


> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223885#comment-16223885
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
I'm planning to merge this PR before the feature freeze on Tuesday.

Cheers, Fabian


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4894: [FLINK-7548] [table] Improve rowtime support of TableSour...

2017-10-29 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
I'm planning to merge this PR before the feature freeze on Tuesday.

Cheers, Fabian


---