[jira] [Assigned] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fang Yong reassigned FLINK-7941: Assignee: Fang Yong > Port SubtasksTimesHandler to new REST endpoint > -- > > Key: FLINK-7941 > URL: https://issues.apache.org/jira/browse/FLINK-7941 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong > > Port *SubtasksTimesHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint
Fang Yong created FLINK-7941: Summary: Port SubtasksTimesHandler to new REST endpoint Key: FLINK-7941 URL: https://issues.apache.org/jira/browse/FLINK-7941 Project: Flink Issue Type: Sub-task Components: Distributed Coordination, REST, Webfrontend Reporter: Fang Yong Port *SubtasksTimesHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi reassigned FLINK-6109: - Assignee: aitozi > Add "consumer lag" report metric to FlinkKafkaConsumer > -- > > Key: FLINK-6109 > URL: https://issues.apache.org/jira/browse/FLINK-6109 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: aitozi > > This is a feature discussed in this ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. > As discussed, we can expose two kinds of "consumer lag" metrics for this: > - *current consumer lag per partition:* the current difference between the > latest offset and the last collected record. This metric is calculated and > updated at a configurable interval. This metric basically serves as an > indicator of how the consumer is keeping up with the head of partitions. I > propose to name this {{currentOffsetLag}}. > - *Consumer lag of last checkpoint per partition:* the difference between > the latest offset and the offset stored in the checkpoint. This metric is > only updated when checkpoints are completed. It serves as an indicator of how > much data may need to be replayed in case of a failure. I propose to name > this {{lastCheckpointedOffsetLag}}. > I don't think it is reasonable to define a metric of whether or not a > consumer has "caught up" with the HEAD. That would imply a threshold for the > offset difference. We should probably leave this "caught up" logic for the > user to determine themselves when they query this metric. > The granularity of the metric is per-FlinkKafkaConsumer, and independent of > the consumer group.id used (the offset used to calculate consumer lag is the > internal offset state of the FlinkKafkaConsumer, not the consumer group's > committed offsets in Kafka). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi reassigned FLINK-6109: - Assignee: aitozi > Add "consumer lag" report metric to FlinkKafkaConsumer > -- > > Key: FLINK-6109 > URL: https://issues.apache.org/jira/browse/FLINK-6109 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: aitozi > > This is a feature discussed in this ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. > As discussed, we can expose two kinds of "consumer lag" metrics for this: > - *current consumer lag per partition:* the current difference between the > latest offset and the last collected record. This metric is calculated and > updated at a configurable interval. This metric basically serves as an > indicator of how the consumer is keeping up with the head of partitions. I > propose to name this {{currentOffsetLag}}. > - *Consumer lag of last checkpoint per partition:* the difference between > the latest offset and the offset stored in the checkpoint. This metric is > only updated when checkpoints are completed. It serves as an indicator of how > much data may need to be replayed in case of a failure. I propose to name > this {{lastCheckpointedOffsetLag}}. > I don't think it is reasonable to define a metric of whether or not a > consumer has "caught up" with the HEAD. That would imply a threshold for the > offset difference. We should probably leave this "caught up" logic for the > user to determine themselves when they query this metric. > The granularity of the metric is per-FlinkKafkaConsumer, and independent of > the consumer group.id used (the offset used to calculate consumer lag is the > internal offset state of the FlinkKafkaConsumer, not the consumer group's > committed offsets in Kafka). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi reassigned FLINK-6109: - Assignee: (was: aitozi) > Add "consumer lag" report metric to FlinkKafkaConsumer > -- > > Key: FLINK-6109 > URL: https://issues.apache.org/jira/browse/FLINK-6109 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > This is a feature discussed in this ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. > As discussed, we can expose two kinds of "consumer lag" metrics for this: > - *current consumer lag per partition:* the current difference between the > latest offset and the last collected record. This metric is calculated and > updated at a configurable interval. This metric basically serves as an > indicator of how the consumer is keeping up with the head of partitions. I > propose to name this {{currentOffsetLag}}. > - *Consumer lag of last checkpoint per partition:* the difference between > the latest offset and the offset stored in the checkpoint. This metric is > only updated when checkpoints are completed. It serves as an indicator of how > much data may need to be replayed in case of a failure. I propose to name > this {{lastCheckpointedOffsetLag}}. > I don't think it is reasonable to define a metric of whether or not a > consumer has "caught up" with the HEAD. That would imply a threshold for the > offset difference. We should probably leave this "caught up" logic for the > user to determine themselves when they query this metric. > The granularity of the metric is per-FlinkKafkaConsumer, and independent of > the consumer group.id used (the offset used to calculate consumer lag is the > internal offset state of the FlinkKafkaConsumer, not the consumer group's > committed offsets in Kafka). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224395#comment-16224395 ] aitozi commented on FLINK-6109: --- Is anyone working on this issue, Now in version 1.3.2 we can only see that the `records-lag-max` metric report by kafka consumer, but no detail lag per partition. If No one do this, can i work on this issue? > Add "consumer lag" report metric to FlinkKafkaConsumer > -- > > Key: FLINK-6109 > URL: https://issues.apache.org/jira/browse/FLINK-6109 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > This is a feature discussed in this ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html. > As discussed, we can expose two kinds of "consumer lag" metrics for this: > - *current consumer lag per partition:* the current difference between the > latest offset and the last collected record. This metric is calculated and > updated at a configurable interval. This metric basically serves as an > indicator of how the consumer is keeping up with the head of partitions. I > propose to name this {{currentOffsetLag}}. > - *Consumer lag of last checkpoint per partition:* the difference between > the latest offset and the offset stored in the checkpoint. This metric is > only updated when checkpoints are completed. It serves as an indicator of how > much data may need to be replayed in case of a failure. I propose to name > this {{lastCheckpointedOffsetLag}}. > I don't think it is reasonable to define a metric of whether or not a > consumer has "caught up" with the HEAD. That would imply a threshold for the > offset difference. We should probably leave this "caught up" logic for the > user to determine themselves when they query this metric. > The granularity of the metric is per-FlinkKafkaConsumer, and independent of > the consumer group.id used (the offset used to calculate consumer lag is the > internal offset state of the FlinkKafkaConsumer, not the consumer group's > committed offsets in Kafka). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7940) Add timeout for futures
[ https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224206#comment-16224206 ] ASF GitHub Bot commented on FLINK-7940: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594496 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture future; + + Timeout(CompletableFuture future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); --- End diff -- Could be useful to have to used timeout in the exception message. > Add timeout for futures > --- > > Key: FLINK-7940 > URL: https://issues.apache.org/jira/browse/FLINK-7940 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > In order to conveniently timeout futures, we should add tooling to > {{FutureUtils}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7940) Add timeout for futures
[ https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224208#comment-16224208 ] ASF GitHub Bot commented on FLINK-7940: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { --- End diff -- make private? > Add timeout for futures > --- > > Key: FLINK-7940 > URL: https://issues.apache.org/jira/browse/FLINK-7940 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > In order to conveniently timeout futures, we should add tooling to > {{FutureUtils}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7940) Add timeout for futures
[ https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224207#comment-16224207 ] ASF GitHub Bot commented on FLINK-7940: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594412 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture future; + + Timeout(CompletableFuture future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); + } + } + + /** +* Delay scheduler used to timeout futures. +* +* This class creates a singleton scheduler used to run the provided actions. +*/ + private static final class Delayer { + static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor( + 1, + new ExecutorThreadFactory("CompletableFutureDelayScheduler")); --- End diff -- Let's add a "Flink" prefix to the thread name. > Add timeout for futures > --- > > Key: FLINK-7940 > URL: https://issues.apache.org/jira/browse/FLINK-7940 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > In order to conveniently timeout futures, we should add tooling to > {{FutureUtils}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { --- End diff -- make private? ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594496 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture future; + + Timeout(CompletableFuture future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); --- End diff -- Could be useful to have to used timeout in the exception message. ---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4918#discussion_r147594412 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable { return result; } - // - // Future Completed with an exception. - // + /** +* Times the given future out after the timeout. +* +* @param future to time out +* @param timeout after which the given future is timed out +* @param timeUnit time unit of the timeout +* @param type of the given future +* @return The timeout enriched future +*/ + public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit timeUnit) { + final ScheduledFuture timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit); + + future.whenComplete((T value, Throwable throwable) -> { + if (!timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + }); + + return future; + } /** -* Returns a {@link CompletableFuture} that has failed with the exception -* provided as argument. -* @param throwable the exception to fail the future with. -* @return The failed future. +* Runnable to complete the given future with a {@link TimeoutException}. */ - public static CompletableFuture getFailedFuture(Throwable throwable) { - CompletableFuture failedAttempt = new CompletableFuture<>(); - failedAttempt.completeExceptionally(throwable); - return failedAttempt; + static final class Timeout implements Runnable { + + private final CompletableFuture future; + + Timeout(CompletableFuture future) { + this.future = Preconditions.checkNotNull(future); + } + + @Override + public void run() { + future.completeExceptionally(new TimeoutException()); + } + } + + /** +* Delay scheduler used to timeout futures. +* +* This class creates a singleton scheduler used to run the provided actions. +*/ + private static final class Delayer { + static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor( + 1, + new ExecutorThreadFactory("CompletableFutureDelayScheduler")); --- End diff -- Let's add a "Flink" prefix to the thread name. ---
[jira] [Comment Edited] (FLINK-7049) TestingApplicationMaster keeps running after integration tests finish
[ https://issues.apache.org/jira/browse/FLINK-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069086#comment-16069086 ] Ted Yu edited comment on FLINK-7049 at 10/29/17 7:14 PM: - Stack trace for TestingApplicationMaster. was (Author: yuzhih...@gmail.com): Stack trace for TestingApplicationMaster . > TestingApplicationMaster keeps running after integration tests finish > - > > Key: FLINK-7049 > URL: https://issues.apache.org/jira/browse/FLINK-7049 > Project: Flink > Issue Type: Test > Components: Tests, YARN >Reporter: Ted Yu >Priority: Minor > Attachments: testingApplicationMaster.stack > > > After integration tests finish, TestingApplicationMaster is still running. > Toward the end of > flink-yarn-tests/target/flink-yarn-tests-ha/flink-yarn-tests-ha-logDir-nm-1_0/application_1498768839874_0001/container_1498768839874_0001_03_01/jobmanager.log > : > {code} > 2017-06-29 22:09:49,681 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server 127.0.0.1/127.0.0.1:46165 > 2017-06-29 22:09:49,681 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed > 2017-06-29 22:09:49,682 WARN org.apache.zookeeper.ClientCnxn > - Session 0x0 for server null, unexpected error, closing socket > connection and attempting reconnect > java.net.ConnectException: Connection refused > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) > 2017-06-29 22:09:50,782 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3597644653611245612.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-06-29 22:09:50,782 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server 127.0.0.1/127.0.0.1:46165 > 2017-06-29 22:09:50,782 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed > 2017-06-29 22:09:50,783 WARN org.apache.zookeeper.ClientCnxn > - Session 0x0 for server null, unexpected error, closing socket > connection and attempting reconnect > java.net.ConnectException: Connection refused > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1
[ https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7642: -- Description: Surefire 2.19 release introduced more useful test filters which would let us run a subset of the test. This issue is for upgrading maven surefire plugin to 2.19.1 was: Surefire 2.19 release introduced more useful test filters which would let us run a subset of the test. This issue is for upgrading maven surefire plugin to 2.19.1 > Upgrade maven surefire plugin to 2.19.1 > --- > > Key: FLINK-7642 > URL: https://issues.apache.org/jira/browse/FLINK-7642 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > Surefire 2.19 release introduced more useful test filters which would let us > run a subset of the test. > This issue is for upgrading maven surefire plugin to 2.19.1 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7795) Utilize error-prone to discover common coding mistakes
[ https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7795: -- Description: http://errorprone.info/ is a tool which detects common coding mistakes. We should incorporate into Flink build process. was: http://errorprone.info/ is a tool which detects common coding mistakes. We should incorporate into Flink build. > Utilize error-prone to discover common coding mistakes > -- > > Key: FLINK-7795 > URL: https://issues.apache.org/jira/browse/FLINK-7795 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > http://errorprone.info/ is a tool which detects common coding mistakes. > We should incorporate into Flink build process. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7940) Add timeout for futures
[ https://issues.apache.org/jira/browse/FLINK-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224046#comment-16224046 ] ASF GitHub Bot commented on FLINK-7940: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4918 [FLINK-7940] Add FutureUtils.orTimeout ## What is the purpose of the change This commit adds a convenience function `FutureUtils#orTimeout` which allows to easily add a timeout to a CompletableFuture. ## Verifying this change - Added `FutureUtilsTest#testOrTimeout` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink orTimeout Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4918.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4918 commit 83e52f5458379855c481fe169f6be50a8afee336 Author: Till RohrmannDate: 2017-10-29T15:01:18Z [hotfix] Remove redundant FutureUtils#getFailedFuture FutureUtils#completedExceptionally does exactly the same. commit 94b3d14bc2ba64a4862bd83e670f3dbfccdf96b8 Author: Till Rohrmann Date: 2017-10-29T15:38:53Z [FLINK-7940] Add FutureUtils.orTimeout This commit adds a convenience function which allows to easily add a timeout to a CompletableFuture. > Add timeout for futures > --- > > Key: FLINK-7940 > URL: https://issues.apache.org/jira/browse/FLINK-7940 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > In order to conveniently timeout futures, we should add tooling to > {{FutureUtils}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4918 [FLINK-7940] Add FutureUtils.orTimeout ## What is the purpose of the change This commit adds a convenience function `FutureUtils#orTimeout` which allows to easily add a timeout to a CompletableFuture. ## Verifying this change - Added `FutureUtilsTest#testOrTimeout` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink orTimeout Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4918.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4918 commit 83e52f5458379855c481fe169f6be50a8afee336 Author: Till RohrmannDate: 2017-10-29T15:01:18Z [hotfix] Remove redundant FutureUtils#getFailedFuture FutureUtils#completedExceptionally does exactly the same. commit 94b3d14bc2ba64a4862bd83e670f3dbfccdf96b8 Author: Till Rohrmann Date: 2017-10-29T15:38:53Z [FLINK-7940] Add FutureUtils.orTimeout This commit adds a convenience function which allows to easily add a timeout to a CompletableFuture. ---
[jira] [Created] (FLINK-7940) Add timeout for futures
Till Rohrmann created FLINK-7940: Summary: Add timeout for futures Key: FLINK-7940 URL: https://issues.apache.org/jira/browse/FLINK-7940 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor In order to conveniently timeout futures, we should add tooling to {{FutureUtils}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224043#comment-16224043 ] ASF GitHub Bot commented on FLINK-7880: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4909 > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4909: [FLINK-7880][QS] Fix QS test instabilities.
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4909 ---
[jira] [Closed] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-7880. - Resolution: Fixed Merged on master at 6b8f7dc2d818cbe87bdfbe8852cfec5507f77a5a > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224027#comment-16224027 ] ASF GitHub Bot commented on FLINK-7548: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147574828 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala --- @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.sql.Timestamp + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +import scala.collection.JavaConverters._ + +/** Util class for [[TableSource]]. */ +object TableSourceUtil { + + /** Returns true if the [[TableSource]] has a rowtime attribute. */ + def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean = +getRowtimeAttributes(tableSource).nonEmpty + + /** Returns true if the [[TableSource]] has a proctime attribute. */ + def hasProctimeAttribute(tableSource: TableSource[_]): Boolean = +getProctimeAttributes(tableSource).nonEmpty + + /** +* Validates a TableSource. +* +* - checks that all fields of the schema can be resolved +* - checks that resolved fields have the correct type +* - checks that the time attributes are correctly configured. +* +* @param tableSource The [[TableSource]] for which the time attributes are checked. +*/ + def validateTableSource(tableSource: TableSource[_]): Unit = { + +val schema = tableSource.getTableSchema +val tableFieldNames = schema.getColumnNames +val tableFieldTypes = schema.getTypes + +// get rowtime and proctime attributes +val rowtimeAttributes = getRowtimeAttributes(tableSource) +val proctimeAttributes = getProctimeAttributes(tableSource) + +// validate that schema fields can be resolved to a return type field of correct type +var mappedFieldCnt = 0 +tableFieldTypes.zip(tableFieldNames).foreach { + case (t: SqlTimeTypeInfo[_], name: String) +if t.getTypeClass == classOf[Timestamp] && proctimeAttributes.contains(name) => +// OK, field was mapped to proctime attribute + case (t: SqlTimeTypeInfo[_], name: String) +if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) => +// OK, field was mapped to rowtime attribute + case (t: TypeInformation[_], name) => +// check if field is registered as time indicator +if (getProctimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Processing time field '$name' has invalid type $t. " + +s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.") +} +if (getRowtimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + +s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.") +} +// check that field can be resolved in input type +val (physicalName, _, tpe) =
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224025#comment-16224025 ] ASF GitHub Bot commented on FLINK-7548: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147582469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala --- @@ -38,23 +38,26 @@ import scala.collection.mutable * @param path The path to the CSV file. * @param fieldNames The names of the table fields. * @param fieldTypes The types of the table fields. + * @param selectedFields The fields which will be read and returned by the table source. + * If None, all fields are returned. * @param fieldDelim The field delimiter, "," by default. * @param rowDelim The row delimiter, "\n" by default. * @param quoteCharacter An optional quote character for String values, null by default. * @param ignoreFirstLine Flag to ignore the first line, false by default. * @param ignoreComments An optional prefix to indicate comments, null by default. * @param lenient Flag to skip records with parse error instead to fail, false by default. */ -class CsvTableSource( +class CsvTableSource private ( private val path: String, private val fieldNames: Array[String], private val fieldTypes: Array[TypeInformation[_]], -private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, -private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, -private val quoteCharacter: Character = null, -private val ignoreFirstLine: Boolean = false, -private val ignoreComments: String = null, -private val lenient: Boolean = false) +private val selectedFields: Array[Int], +private val fieldDelim: String, +private val rowDelim: String, +private val quoteCharacter: Character, +private val ignoreFirstLine: Boolean, +private val ignoreComments: String, +private val lenient: Boolean) extends BatchTableSource[Row] --- End diff -- Maybe we need a base class instead of traits to do something like checking the equality of numbers of field names/types. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224024#comment-16224024 ] ASF GitHub Bot commented on FLINK-7548: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147583120 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference} + +/** + * The [[FieldComputer]] interface returns an expression to compute the field of the table schema + * of a [[TableSource]] from one or more fields of the [[TableSource]]'s return type. + * + * @tparam T The result type of the provided expression. + */ +abstract class FieldComputer[T] { --- End diff -- I think we could add a `getArguments: Array[_]` function to allow providing extra arguments besides the existing fields, e.g., for situations in which event-time is represented as `String`, the argument should be the time pattern. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224026#comment-16224026 ] ASF GitHub Bot commented on FLINK-7548: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147580191 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -56,33 +63,123 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) -: PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new StreamTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[StreamTableSource[_]] + newTableSource.asInstanceOf[StreamTableSource[_]], + selectedFields ) } override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { +val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = true, + selectedFields) + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] -convertToInternalRow( - new RowSchema(getRowType), +val outputSchema = new RowSchema(this.getRowType) + +// check that declared and actual type of table source DataStream are identical +if (inputDataStream.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + +s"returned a DataStream of type ${inputDataStream.getType} that does not match with the " + +s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + +s"Please validate the implementation of the TableSource.") +} + +// get expression to extract rowtime attribute +val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + TimeIndicatorTypeInfo.ROWTIME_INDICATOR +) + +// ingest table and convert and extract time attributes if necessary +val ingestedTable = convertToInternalRow( + outputSchema, inputDataStream, - new StreamTableSourceTable(tableSource), - config) + fieldIndexes, + config, + rowtimeExpression) + +// generate watermarks for rowtime indicator +val rowtimeDesc: Option[RowtimeAttributeDescriptor] = + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, selectedFields) + +val withWatermarks = if (rowtimeDesc.isDefined) { + val rowtimeFieldIdx = outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName) + val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy + watermarkStrategy match { +case p: PeriodicWatermarkAssigner => + val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) +case p: PunctuatedWatermarkAssigner => + val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + } +} else { + // No need to generate watermarks if no rowtime attribute is specified. + ingestedTable +} + +withWatermarks + } +} + +/** + * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PeriodicWatermarkAssignerWrapper( +timeFieldIdx: Int, +assigner: PeriodicWatermarkAssigner) + extends AssignerWithPeriodicWatermarks[CRow] { + + override def getCurrentWatermark: Watermark = assigner.getWatermark + + override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): Long = { +val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] +assigner.nextTimestamp(timestamp) +0L --- End diff -- I know the timestamp in the `StreamRecord` is useless for the Table/SQL API
[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147583120 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference} + +/** + * The [[FieldComputer]] interface returns an expression to compute the field of the table schema + * of a [[TableSource]] from one or more fields of the [[TableSource]]'s return type. + * + * @tparam T The result type of the provided expression. + */ +abstract class FieldComputer[T] { --- End diff -- I think we could add a `getArguments: Array[_]` function to allow providing extra arguments besides the existing fields, e.g., for situations in which event-time is represented as `String`, the argument should be the time pattern. ---
[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147574828 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala --- @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.sql.Timestamp + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +import scala.collection.JavaConverters._ + +/** Util class for [[TableSource]]. */ +object TableSourceUtil { + + /** Returns true if the [[TableSource]] has a rowtime attribute. */ + def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean = +getRowtimeAttributes(tableSource).nonEmpty + + /** Returns true if the [[TableSource]] has a proctime attribute. */ + def hasProctimeAttribute(tableSource: TableSource[_]): Boolean = +getProctimeAttributes(tableSource).nonEmpty + + /** +* Validates a TableSource. +* +* - checks that all fields of the schema can be resolved +* - checks that resolved fields have the correct type +* - checks that the time attributes are correctly configured. +* +* @param tableSource The [[TableSource]] for which the time attributes are checked. +*/ + def validateTableSource(tableSource: TableSource[_]): Unit = { + +val schema = tableSource.getTableSchema +val tableFieldNames = schema.getColumnNames +val tableFieldTypes = schema.getTypes + +// get rowtime and proctime attributes +val rowtimeAttributes = getRowtimeAttributes(tableSource) +val proctimeAttributes = getProctimeAttributes(tableSource) + +// validate that schema fields can be resolved to a return type field of correct type +var mappedFieldCnt = 0 +tableFieldTypes.zip(tableFieldNames).foreach { + case (t: SqlTimeTypeInfo[_], name: String) +if t.getTypeClass == classOf[Timestamp] && proctimeAttributes.contains(name) => +// OK, field was mapped to proctime attribute + case (t: SqlTimeTypeInfo[_], name: String) +if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) => +// OK, field was mapped to rowtime attribute + case (t: TypeInformation[_], name) => +// check if field is registered as time indicator +if (getProctimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Processing time field '$name' has invalid type $t. " + +s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.") +} +if (getRowtimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + +s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.") +} +// check that field can be resolved in input type +val (physicalName, _, tpe) = resolveInputField(name, tableSource) +// validate that mapped fields are are same type +if (tpe != t) { + throw ValidationException(s"Type $t of table field '$name' does not " + +
[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147580191 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -56,33 +63,123 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) -: PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new StreamTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[StreamTableSource[_]] + newTableSource.asInstanceOf[StreamTableSource[_]], + selectedFields ) } override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { +val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = true, + selectedFields) + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] -convertToInternalRow( - new RowSchema(getRowType), +val outputSchema = new RowSchema(this.getRowType) + +// check that declared and actual type of table source DataStream are identical +if (inputDataStream.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + +s"returned a DataStream of type ${inputDataStream.getType} that does not match with the " + +s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + +s"Please validate the implementation of the TableSource.") +} + +// get expression to extract rowtime attribute +val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + TimeIndicatorTypeInfo.ROWTIME_INDICATOR +) + +// ingest table and convert and extract time attributes if necessary +val ingestedTable = convertToInternalRow( + outputSchema, inputDataStream, - new StreamTableSourceTable(tableSource), - config) + fieldIndexes, + config, + rowtimeExpression) + +// generate watermarks for rowtime indicator +val rowtimeDesc: Option[RowtimeAttributeDescriptor] = + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, selectedFields) + +val withWatermarks = if (rowtimeDesc.isDefined) { + val rowtimeFieldIdx = outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName) + val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy + watermarkStrategy match { +case p: PeriodicWatermarkAssigner => + val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) +case p: PunctuatedWatermarkAssigner => + val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + } +} else { + // No need to generate watermarks if no rowtime attribute is specified. + ingestedTable +} + +withWatermarks + } +} + +/** + * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PeriodicWatermarkAssignerWrapper( +timeFieldIdx: Int, +assigner: PeriodicWatermarkAssigner) + extends AssignerWithPeriodicWatermarks[CRow] { + + override def getCurrentWatermark: Watermark = assigner.getWatermark + + override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): Long = { +val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] +assigner.nextTimestamp(timestamp) +0L --- End diff -- I know the timestamp in the `StreamRecord` is useless for the Table/SQL API now, but the returned value will still be set and `StreamRecord.hasTimestamp()` will be `true`. How about return a negative value (e.g., -1) here, so that maybe it's possible for us to erase the time from `StreamRecord` when the extracted
[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147582469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala --- @@ -38,23 +38,26 @@ import scala.collection.mutable * @param path The path to the CSV file. * @param fieldNames The names of the table fields. * @param fieldTypes The types of the table fields. + * @param selectedFields The fields which will be read and returned by the table source. + * If None, all fields are returned. * @param fieldDelim The field delimiter, "," by default. * @param rowDelim The row delimiter, "\n" by default. * @param quoteCharacter An optional quote character for String values, null by default. * @param ignoreFirstLine Flag to ignore the first line, false by default. * @param ignoreComments An optional prefix to indicate comments, null by default. * @param lenient Flag to skip records with parse error instead to fail, false by default. */ -class CsvTableSource( +class CsvTableSource private ( private val path: String, private val fieldNames: Array[String], private val fieldTypes: Array[TypeInformation[_]], -private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, -private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, -private val quoteCharacter: Character = null, -private val ignoreFirstLine: Boolean = false, -private val ignoreComments: String = null, -private val lenient: Boolean = false) +private val selectedFields: Array[Int], +private val fieldDelim: String, +private val rowDelim: String, +private val quoteCharacter: Character, +private val ignoreFirstLine: Boolean, +private val ignoreComments: String, +private val lenient: Boolean) extends BatchTableSource[Row] --- End diff -- Maybe we need a base class instead of traits to do something like checking the equality of numbers of field names/types. ---
[GitHub] flink issue #4751: [FLINK-7739][kafka-tests] Throttle down data producing th...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4751 I assume that the `Thread.sleep(1)` should not do much harm if the Kafka Producer already blocks due to buffering and I/O. If it should not block, then the `Thread.sleep(1)` will give other threads the chance to execute when having only scarce resources (e.g. on Travis). I agree with @pnowojski that this is not a stress test and we have seen in the past that tests became unstable due to resource scarcity. Therefore I would like to merge this PR if Travis gives green light and @StephanEwen does not object. ---
[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224019#comment-16224019 ] ASF GitHub Bot commented on FLINK-7739: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4751 I assume that the `Thread.sleep(1)` should not do much harm if the Kafka Producer already blocks due to buffering and I/O. If it should not block, then the `Thread.sleep(1)` will give other threads the chance to execute when having only scarce resources (e.g. on Travis). I agree with @pnowojski that this is not a stress test and we have seen in the past that tests became unstable due to resource scarcity. Therefore I would like to merge this PR if Travis gives green light and @StephanEwen does not object. > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223885#comment-16223885 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 I'm planning to merge this PR before the feature freeze on Tuesday. Cheers, Fabian > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4894: [FLINK-7548] [table] Improve rowtime support of TableSour...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 I'm planning to merge this PR before the feature freeze on Tuesday. Cheers, Fabian ---