[jira] [Commented] (FLINK-7511) Remove dead code after dropping backward compatibility with <=1.2
[ https://issues.apache.org/jira/browse/FLINK-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335461#comment-16335461 ] ASF GitHub Bot commented on FLINK-7511: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4587 > Remove dead code after dropping backward compatibility with <=1.2 > - > > Key: FLINK-7511 > URL: https://issues.apache.org/jira/browse/FLINK-7511 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4587: [FLINK-7511] [cep] Remove dead code after dropping...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4587 ---
[GitHub] flink issue #4587: [FLINK-7511] [cep] Remove dead code after dropping backwa...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4587 Thanks for review @pnowojski. Merging. ---
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335449#comment-16335449 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163161066 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java --- @@ -132,4 +137,23 @@ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { return new StandaloneLeaderElectionService(); } } + + @Override + public LeaderRetrievalService getRestServerLeaderRetriever() { --- End diff -- renamed > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7511) Remove dead code after dropping backward compatibility with <=1.2
[ https://issues.apache.org/jira/browse/FLINK-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335450#comment-16335450 ] ASF GitHub Bot commented on FLINK-7511: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4587 Thanks for review @pnowojski. Merging. > Remove dead code after dropping backward compatibility with <=1.2 > - > > Key: FLINK-7511 > URL: https://issues.apache.org/jira/browse/FLINK-7511 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163161066 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java --- @@ -132,4 +137,23 @@ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { return new StandaloneLeaderElectionService(); } } + + @Override + public LeaderRetrievalService getRestServerLeaderRetriever() { --- End diff -- renamed ---
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335445#comment-16335445 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160900 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -61,46 +69,77 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource; import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedThrowable; -import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.CheckedSupplier; + +import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; + +import akka.actor.AddressFromURIString; import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.MalformedURLException; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; +import scala.Option; + +import static org.apache.flink.util.Preconditions.checkArgument; /** * A {@link ClusterClient} implementation that communicates via HTTP REST requests. */ public class RestClusterClient extends ClusterClient { + private static final long AWAIT_LEADER_TIMEOUT = 10_000; + + private static final int MAX_RETRIES = 20; + + private static final Time RETRY_DELAY = Time.seconds(3); --- End diff -- Moved to `RestOptions` > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335447#comment-16335447 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163161003 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -327,13 +376,14 @@ public T getClusterId() { } private > R waitForResource( - final SupplierWithException, IOException> resourceFutureSupplier) + final Supplier> resourceFutureSupplier) throws IOException, InterruptedException, ExecutionException, TimeoutException { A asynchronouslyCreatedResource; long attempt = 0; while (true) { final CompletableFuture responseFuture = resourceFutureSupplier.get(); --- End diff -- Made the method `waitForResource ` non-blocking. > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335446#comment-16335446 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160938 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //- + // RestClient Helper + //- + + private , U extends MessageParameters, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private , R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private , P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + }, retryPredicate); + } + + private CompletableFuture retry( + CheckedSupplier> operation, + Predicate retryPredicate) { + return FutureUtils.retryWithDelay( + CheckedSupplier.unchecked(operation), + MAX_RETRIES, + RETRY_DELAY, + retryPredicate, + new ScheduledExecutorServiceAdapter(retryExecutorService)); + } + + private static Predicate isTimeoutException() { + return (throwable) -> + ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); + } + + private static Predicate isHttpStatusUnsuccessfulException() { + return (throwable) -> ExceptionUtils.findThrowable(throwable, RestClientException.class) + .map(restClientException -> { + final int code = restClientException.getHttpResponseStatus().code(); + return code < 200 || code > 299; + }) + .orElse(false); + } + + private abstract class RestClusterClientLeaderRetrievalListener implements LeaderRetrievalListener { + @Override + public final void handleError(final Exception exception) { + log.error("Exception in LeaderRetrievalListener", exception); + shutdown(); --- End diff -- Using `LeaderRetriever` now > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 >
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335444#comment-16335444 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160849 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -327,13 +376,14 @@ public T getClusterId() { } private > R waitForResource( - final SupplierWithException, IOException> resourceFutureSupplier) + final Supplier> resourceFutureSupplier) throws IOException, InterruptedException, ExecutionException, TimeoutException { --- End diff -- removed.. also made the method non-blocking > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163161003 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -327,13 +376,14 @@ public T getClusterId() { } private > R waitForResource( - final SupplierWithException, IOException> resourceFutureSupplier) + final Supplier> resourceFutureSupplier) throws IOException, InterruptedException, ExecutionException, TimeoutException { A asynchronouslyCreatedResource; long attempt = 0; while (true) { final CompletableFuture responseFuture = resourceFutureSupplier.get(); --- End diff -- Made the method `waitForResource ` non-blocking. ---
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160938 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //- + // RestClient Helper + //- + + private , U extends MessageParameters, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private , R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private , P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + }, retryPredicate); + } + + private CompletableFuture retry( + CheckedSupplier> operation, + Predicate retryPredicate) { + return FutureUtils.retryWithDelay( + CheckedSupplier.unchecked(operation), + MAX_RETRIES, + RETRY_DELAY, + retryPredicate, + new ScheduledExecutorServiceAdapter(retryExecutorService)); + } + + private static Predicate isTimeoutException() { + return (throwable) -> + ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); + } + + private static Predicate isHttpStatusUnsuccessfulException() { + return (throwable) -> ExceptionUtils.findThrowable(throwable, RestClientException.class) + .map(restClientException -> { + final int code = restClientException.getHttpResponseStatus().code(); + return code < 200 || code > 299; + }) + .orElse(false); + } + + private abstract class RestClusterClientLeaderRetrievalListener implements LeaderRetrievalListener { + @Override + public final void handleError(final Exception exception) { + log.error("Exception in LeaderRetrievalListener", exception); + shutdown(); --- End diff -- Using `LeaderRetriever` now ---
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160802 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //- + // RestClient Helper + //- + + private , U extends MessageParameters, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private , R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private , P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); --- End diff -- done ---
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335443#comment-16335443 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160802 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //- + // RestClient Helper + //- + + private , U extends MessageParameters, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private , R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private , P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); --- End diff -- done > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335441#comment-16335441 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //- + // RestClient Helper + //- + + private , U extends MessageParameters, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private , R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private , P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + }, retryPredicate); + } + + private CompletableFuture retry( + CheckedSupplier> operation, + Predicate retryPredicate) { + return FutureUtils.retryWithDelay( + CheckedSupplier.unchecked(operation), + MAX_RETRIES, + RETRY_DELAY, + retryPredicate, + new ScheduledExecutorServiceAdapter(retryExecutorService)); + } + + private static Predicate isTimeoutException() { + return (throwable) -> + ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); --- End diff -- renamed to `isConnectionProblemException` > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160900 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -61,46 +69,77 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource; import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedThrowable; -import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.CheckedSupplier; + +import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; + +import akka.actor.AddressFromURIString; import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.MalformedURLException; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; +import scala.Option; + +import static org.apache.flink.util.Preconditions.checkArgument; /** * A {@link ClusterClient} implementation that communicates via HTTP REST requests. */ public class RestClusterClient extends ClusterClient { + private static final long AWAIT_LEADER_TIMEOUT = 10_000; + + private static final int MAX_RETRIES = 20; + + private static final Time RETRY_DELAY = Time.seconds(3); --- End diff -- Moved to `RestOptions` ---
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160849 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -327,13 +376,14 @@ public T getClusterId() { } private > R waitForResource( - final SupplierWithException, IOException> resourceFutureSupplier) + final Supplier> resourceFutureSupplier) throws IOException, InterruptedException, ExecutionException, TimeoutException { --- End diff -- removed.. also made the method non-blocking ---
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //- + // RestClient Helper + //- + + private , U extends MessageParameters, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private , R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private , P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture + sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + }, retryPredicate); + } + + private CompletableFuture retry( + CheckedSupplier> operation, + Predicate retryPredicate) { + return FutureUtils.retryWithDelay( + CheckedSupplier.unchecked(operation), + MAX_RETRIES, + RETRY_DELAY, + retryPredicate, + new ScheduledExecutorServiceAdapter(retryExecutorService)); + } + + private static Predicate isTimeoutException() { + return (throwable) -> + ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); --- End diff -- renamed to `isConnectionProblemException` ---
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160703 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java --- @@ -30,49 +28,20 @@ */ public final class RestClusterClientConfiguration { --- End diff -- Now it's needed again. ---
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335439#comment-16335439 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160703 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java --- @@ -30,49 +28,20 @@ */ public final class RestClusterClientConfiguration { --- End diff -- Now it's needed again. > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160672 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java --- @@ -144,6 +154,15 @@ public WebMonitorEndpoint( metricQueryServiceRetriever, executor, restConfiguration.getTimeout()); + + this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); + } + + @Override + public void start() throws Exception { + super.start(); + leaderElectionService.start(this); --- End diff -- Added ``` try { leaderElectionService.stop(); } catch (Exception e) { log.warn("Error while stopping leaderElectionService", e); } ``` to `shutdown` hook ---
[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335437#comment-16335437 ] ASF GitHub Bot commented on FLINK-8344: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163160672 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java --- @@ -144,6 +154,15 @@ public WebMonitorEndpoint( metricQueryServiceRetriever, executor, restConfiguration.getTimeout()); + + this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); + } + + @Override + public void start() throws Exception { + super.start(); + leaderElectionService.start(this); --- End diff -- Added ``` try { leaderElectionService.stop(); } catch (Exception e) { log.warn("Error while stopping leaderElectionService", e); } ``` to `shutdown` hook > Add support for HA to RestClusterClient > --- > > Key: FLINK-8344 > URL: https://issues.apache.org/jira/browse/FLINK-8344 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClusterClient}} must be able to deal with changing JobMasters in > case of HA. We have to add functionality to reconnect to a newly elected > leader in case of HA. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8458) Add the switch for keeping both the old mode and the new credit-based mode
[ https://issues.apache.org/jira/browse/FLINK-8458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335419#comment-16335419 ] ASF GitHub Bot commented on FLINK-8458: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r163155607 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + +- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, it indicates how many floating credits are shared for all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback. So the floating buffers can help relief back-pressure caused by imbalance data distribution among subpartitions. + --- End diff -- thanks for your polish, alpinegizmo. I will apply the above fixes. > Add the switch for keeping both the old mode and the new credit-based mode > -- > > Key: FLINK-8458 > URL: https://issues.apache.org/jira/browse/FLINK-8458 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > After the whole feature of credit-based flow control is done, we should add a > config parameter to switch on/off the new credit-based mode. To do so, we can > roll back to the old network mode for any expected risks. > The parameter is defined as > {{taskmanager.network.credit-based-flow-control.enabled}} and the default > value is true. This switch may be removed after next release. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r163155607 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + +- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, it indicates how many floating credits are shared for all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback. So the floating buffers can help relief back-pressure caused by imbalance data distribution among subpartitions. + --- End diff -- thanks for your polish, alpinegizmo. I will apply the above fixes. ---
[jira] [Commented] (FLINK-8458) Add the switch for keeping both the old mode and the new credit-based mode
[ https://issues.apache.org/jira/browse/FLINK-8458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335418#comment-16335418 ] ASF GitHub Bot commented on FLINK-8458: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r163155437 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + --- End diff -- It is also used in current old mode and it is no need to change the default value in most cases in the old mode. Considering the new credit-based mode, if the value greater than 2, it can get benefits if the bottleneck is caused by slow downstream processing. The greater the value is set, the lower probability of blocking the upstream and causing back-pressure. But we should also consider the total available buffer resources for setting this parameter. > Add the switch for keeping both the old mode and the new credit-based mode > -- > > Key: FLINK-8458 > URL: https://issues.apache.org/jira/browse/FLINK-8458 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > After the whole feature of credit-based flow control is done, we should add a > config parameter to switch on/off the new credit-based mode. To do so, we can > roll back to the old network mode for any expected risks. > The parameter is defined as > {{taskmanager.network.credit-based-flow-control.enabled}} and the default > value is true. This switch may be removed after next release. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r163155437 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + --- End diff -- It is also used in current old mode and it is no need to change the default value in most cases in the old mode. Considering the new credit-based mode, if the value greater than 2, it can get benefits if the bottleneck is caused by slow downstream processing. The greater the value is set, the lower probability of blocking the upstream and causing back-pressure. But we should also consider the total available buffer resources for setting this parameter. ---
[jira] [Commented] (FLINK-8445) hostname used in metric names for taskmanager and jobmanager are not consistent
[ https://issues.apache.org/jira/browse/FLINK-8445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335362#comment-16335362 ] Chris Thomson commented on FLINK-8445: -- Problem appears to be present in 1.4.0 as well. > hostname used in metric names for taskmanager and jobmanager are not > consistent > --- > > Key: FLINK-8445 > URL: https://issues.apache.org/jira/browse/FLINK-8445 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.3.1, 1.4.0 > Environment: I think that this problem is present for metrics > reporting enabled configurations that include '' as part of the scope > for the metrics. For example, using Graphite reporting configuration in > flink-conf.yaml below: > {code:java} > metrics.scope.jm: flink..jobmanager > metrics.scope.jm.job: flink..jobmanager. > metrics.scope.tm: flink..taskmanager > metrics.scope.tm.job: flink..taskmanager. > metrics.scope.task: > flink..taskmanager... > metrics.scope.operator: > flink..taskmanager... > metrics.reporters: graphite > metrics.reporter.graphite.class: > org.apache.flink.metrics.graphite.GraphiteReporter > ...{code} >Reporter: Chris Thomson >Priority: Minor > > Enabled Flink metrics reporting using Graphite using system scopes that > contain '' for both the job manager and task manager. The resulting > metrics reported to Graphite use two different representations for ''. > For *Task Manager metrics* it uses the *short hostname* (without the DNS > domain). This is a result of logic in > org.apache.flink.runtime.taskmanager.TaskManagerLocation constructor that > tries to extract the short hostname from the fully qualified domain name > looked up from InetAddress.getCanonicalHostName(). > For *Job Manager metrics* it uses the *fully qualified domain name* (with the > DNS domain). This is a result of there being no logic in > org.apache.flink.runtime.jobmanager.JobManagerRunner or > org.apache.flink.runtime.rpc.akka.AkkaRpcService to perform equivalent > normalization of the fully qualified domain name down to the short hostname. > Ideally the '' placeholders in the system scopes for the job manager > and task manager related metrics would be replaced with a consistent value > (either the short hostname or the fully qualified domain name). Even better > if there was a configuration option to decide which one should be used for > metric name generation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8445) hostname used in metric names for taskmanager and jobmanager are not consistent
[ https://issues.apache.org/jira/browse/FLINK-8445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Thomson updated FLINK-8445: - Affects Version/s: 1.4.0 > hostname used in metric names for taskmanager and jobmanager are not > consistent > --- > > Key: FLINK-8445 > URL: https://issues.apache.org/jira/browse/FLINK-8445 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.3.1, 1.4.0 > Environment: I think that this problem is present for metrics > reporting enabled configurations that include '' as part of the scope > for the metrics. For example, using Graphite reporting configuration in > flink-conf.yaml below: > {code:java} > metrics.scope.jm: flink..jobmanager > metrics.scope.jm.job: flink..jobmanager. > metrics.scope.tm: flink..taskmanager > metrics.scope.tm.job: flink..taskmanager. > metrics.scope.task: > flink..taskmanager... > metrics.scope.operator: > flink..taskmanager... > metrics.reporters: graphite > metrics.reporter.graphite.class: > org.apache.flink.metrics.graphite.GraphiteReporter > ...{code} >Reporter: Chris Thomson >Priority: Minor > > Enabled Flink metrics reporting using Graphite using system scopes that > contain '' for both the job manager and task manager. The resulting > metrics reported to Graphite use two different representations for ''. > For *Task Manager metrics* it uses the *short hostname* (without the DNS > domain). This is a result of logic in > org.apache.flink.runtime.taskmanager.TaskManagerLocation constructor that > tries to extract the short hostname from the fully qualified domain name > looked up from InetAddress.getCanonicalHostName(). > For *Job Manager metrics* it uses the *fully qualified domain name* (with the > DNS domain). This is a result of there being no logic in > org.apache.flink.runtime.jobmanager.JobManagerRunner or > org.apache.flink.runtime.rpc.akka.AkkaRpcService to perform equivalent > normalization of the fully qualified domain name down to the short hostname. > Ideally the '' placeholders in the system scopes for the job manager > and task manager related metrics would be replaced with a consistent value > (either the short hostname or the fully qualified domain name). Even better > if there was a configuration option to decide which one should be used for > metric name generation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8431) Allow to specify # GPUs for TaskManager in Mesos
[ https://issues.apache.org/jira/browse/FLINK-8431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335239#comment-16335239 ] ASF GitHub Bot commented on FLINK-8431: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5307 @tillrohrmann please merge > Allow to specify # GPUs for TaskManager in Mesos > > > Key: FLINK-8431 > URL: https://issues.apache.org/jira/browse/FLINK-8431 > Project: Flink > Issue Type: Improvement > Components: Cluster Management, Mesos >Reporter: Dongwon Kim >Assignee: Dongwon Kim >Priority: Minor > > Mesos provides first-class support for Nvidia GPUs [1], but Flink does not > exploit it when scheduling TaskManagers. If Mesos agents are configured to > isolate GPUs as shown in [2], TaskManagers that do not specify to use GPUs > cannot see GPUs at all. > We, therefore, need to introduce a new configuration property named > "mesos.resourcemanager.tasks.gpus" to allow users to specify # of GPUs for > each TaskManager process in Mesos. > [1] http://mesos.apache.org/documentation/latest/gpu-support/ > [2] http://mesos.apache.org/documentation/latest/gpu-support/#agent-flags -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5307: [FLINK-8431] [mesos] Allow to specify # GPUs for TaskMana...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5307 @tillrohrmann please merge ---
[jira] [Created] (FLINK-8485) Running Flink inside Intellij no longer works after upgrading from 1.3.2 to 1.4.0
Xuan Nguyen created FLINK-8485: -- Summary: Running Flink inside Intellij no longer works after upgrading from 1.3.2 to 1.4.0 Key: FLINK-8485 URL: https://issues.apache.org/jira/browse/FLINK-8485 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.4.0 Reporter: Xuan Nguyen I upgraded flink from 1.3.2 to 1.4.0 and my simple test case no longer runs within Intellij or any other IDE when I click on the "RUN" button. I'm using JDK 1.8. My Dependencies are: {code:java} dependencies { compile group: 'log4j', name: 'log4j', version: '1.2.17' compile 'org.apache.flink:flink-java:1.4.0' compile 'org.apache.flink:flink-streaming-java_2.11:1.4.0' compile 'org.apache.flink:flink-clients_2.11:1.4.0' compile 'org.apache.flink:flink-table_2.11:1.4.0' compile 'org.apache.flink:flink-scala_2.11:1.4.0' compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.0' compile 'org.apache.flink:flink-connector-kafka-0.8_2.11:1.4.0' compile 'org.apache.flink:flink-queryable-state-runtime_2.11:1.4.0' compile 'org.apache.flink:flink-queryable-state-client-java__2.11:1.4.0' testCompile 'junit:junit:+' } {code} {{ }} {{The exception:}} {code:java} Exception in thread "main" org.apache.flink.runtime.client.JobSubmissionException: Could not retrieve BlobServer address. at org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:166) at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1662993273]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$". at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:160) ... 9 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1662993273]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748){code} The debug full logs are located in [https://gist.github.com/xuan/e6d4543c478c30d5747428589b03dd03] along with the test case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335116#comment-16335116 ] Eron Wright commented on FLINK-5479: - To elaborate on my earlier comment about `max.message.time.difference.ms`, let's consider the ideal watermark for the two types of timestamps supported by Kafka (as per KIP-32), CreateTime and LogAppendTime. In LogAppendTime, the timestamp is monotonically increasing with each message, and corresponds to the wall clock time of the broker at append time. The per-partition watermark could simply track the message time. The complication is how to advance the watermark when the partition is idle; an in-band heartbeat from the broker (informing the client about the progression of its wall clock) would be ideal. In CreateTime, the timestamp is supplied by the producer, but the broker may enforce an upper bound ("max difference") on the delta between the message timestamp and the broker's current time. The ideal per-partition watermark would be the broker's current time minus the max difference. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8120) Cannot access Web UI from YARN application overview in FLIP-6 mode
[ https://issues.apache.org/jira/browse/FLINK-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8120. -- Resolution: Fixed Fixed via 627bcda > Cannot access Web UI from YARN application overview in FLIP-6 mode > -- > > Key: FLINK-8120 > URL: https://issues.apache.org/jira/browse/FLINK-8120 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The Web UI cannot be accessed through YARN's application overview (_Tracking > UI_ link). The proxy displays a stacktrace. > {noformat} > Caused by: > org.apache.http.client.ClientProtocolException > at > org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:888) > at > org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) > at > org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) > at > org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:242) > at > org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.methodAction(WebAppProxyServlet.java:461) > at > org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:290) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) > at > org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) > at > com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) > at > com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) > at > com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) > at > org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:178) > at > com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) > at > com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) > at > com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) > at > com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) > at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at > org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at > org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:636) > at > org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter.doFilter(DelegationTokenAuthenticationFilter.java:294) > at > org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:588) > at > org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter.doFilter(RMAuthenticationFilter.java:82) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at > org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1353) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at > org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) > at > org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) > at > org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182) > at > org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766) > at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450) > at > org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230) > at > org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152) > at org.mortbay.jetty
[jira] [Commented] (FLINK-8365) Relax List type in HeapListState and HeapKeyedStateBackend
[ https://issues.apache.org/jira/browse/FLINK-8365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335028#comment-16335028 ] ASF GitHub Bot commented on FLINK-8365: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5326 Thanks for the feedback! I updated the PR and squashed all commits > Relax List type in HeapListState and HeapKeyedStateBackend > -- > > Key: FLINK-8365 > URL: https://issues.apache.org/jira/browse/FLINK-8365 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > {{stateTable}} in HeapListState and > {{HeapKeyedStateBackend#createListState()}} are both strongly typed to > {{ArrayList}} right now. > As discussed with [~StephanEwen] and [~stefanrichte...@gmail.com] in > https://github.com/apache/flink/pull/4963, we may want to relax the type to > {{List}}. > Problems discovered now: > 1. That may require changing serializer from {{ArrayListSerializer}} to > {{ListSerializer}} in the following code, and we need to discuss the pros and > cons > {code:java} > @Override > public InternalListState createListState( > TypeSerializer namespaceSerializer, > ListStateDescriptor stateDesc) throws Exception { > // the list state does some manual mapping, because the state > is typed to the generic > // 'List' interface, but we want to use an implementation typed > to ArrayList > // using a more specialized implementation opens up runtime > optimizations > StateTable> stateTable = > tryRegisterStateTable( > stateDesc.getName(), > stateDesc.getType(), > namespaceSerializer, > new > ArrayListSerializer(stateDesc.getElementSerializer())); > return new HeapListState<>(stateDesc, stateTable, > keySerializer, namespaceSerializer); > } > {code} > 2. for non-RocksDBStateBackend (AsyncFileStateBackendTest, > AsyncMemoryStateBackendTest, FileStateBackendTest, and > MemoryStateBackendTest), unit tests testListState and > testListStateAddUpdateAndGet fail -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5326: [FLINK-8365] [State Backend] Relax List type in HeapListS...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5326 Thanks for the feedback! I updated the PR and squashed all commits ---
[jira] [Commented] (FLINK-8267) update Kinesis Producer example for setting Region key
[ https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335021#comment-16335021 ] ASF GitHub Bot commented on FLINK-8267: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5301 Make sense. Updated the PR also squashed all commits (first time using squash!) > update Kinesis Producer example for setting Region key > -- > > Key: FLINK-8267 > URL: https://issues.apache.org/jira/browse/FLINK-8267 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Dyana Rose >Assignee: Bowen Li >Priority: Minor > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer > In the example code for the kinesis producer the region key is set like: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > {code} > However, the AWS Kinesis Producer Library requires that the region key be > Region > (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269) > so the setting at this point should be: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > producerConfig.put("Region", "us-east-1"); > {code} > When you run the Kinesis Producer you can see the effect of not setting the > Region key by a log line > {noformat} > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Started > Kinesis producer instance for region '' > {noformat} > The KPL also then assumes it's running on EC2 and attempts to determine it's > own region, which fails. > {noformat} > (EC2MetadataClient)Http request to Ec2MetadataService failed. > [error] [main.cc:266] Could not configure the region. It was not given in the > config and we were unable to retrieve it from EC2 metadata > {noformat} > At the least I'd say the documentation should mention the difference between > these two keys and when you are required to also set the Region key. > On the other hand, is this even the intended behaviour of this connector? Was > it intended that the AWSConfigConstants.AWS_REGION key also set the region of > the of the kinesis stream? The documentation for the example states > {noformat} > The example demonstrates producing a single Kinesis stream in the AWS region > “us-east-1”. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5323: [FLINK-8441] [State Backend] [RocksDB] change RocksDBList...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5323 Make sense. Updated the PR also squashed all commits ---
[GitHub] flink issue #5301: [FLINK-8267] [Kinesis Connector] update Kinesis Producer ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5301 Make sense. Updated the PR also squashed all commits (first time using squash!) ---
[jira] [Commented] (FLINK-8441) serialize values and value separator directly to stream in RocksDBListState
[ https://issues.apache.org/jira/browse/FLINK-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335019#comment-16335019 ] ASF GitHub Bot commented on FLINK-8441: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5323 Make sense. Updated the PR also squashed all commits > serialize values and value separator directly to stream in RocksDBListState > --- > > Key: FLINK-8441 > URL: https://issues.apache.org/jira/browse/FLINK-8441 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > In \{{RocksDBListState#getPreMergedValue}}, we could probably serialize > values and value separator directly into {{keySerializationStream}}. > We tried once, it didn't work out. Let's try one more time -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8463) Remove unnecessary thread blocking in RestClient#submitRequest
[ https://issues.apache.org/jira/browse/FLINK-8463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8463. Resolution: Fixed Fixed via 016118026ca96abb691b236ca7d08db94c93684a > Remove unnecessary thread blocking in RestClient#submitRequest > -- > > Key: FLINK-8463 > URL: https://issues.apache.org/jira/browse/FLINK-8463 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClient}} unnecessarily blocks an IO executor thread when trying to > open a connection to a remote destination. This can be improved by > registering a {{ChannelFuture}} listener which continues the execution once > the connection has been established. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8462) TaskExecutor does not verify RM heartbeat timeouts
[ https://issues.apache.org/jira/browse/FLINK-8462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8462. Resolution: Fixed Fixed via 776af4a882c85926fc0764b702fec717c675e34c > TaskExecutor does not verify RM heartbeat timeouts > -- > > Key: FLINK-8462 > URL: https://issues.apache.org/jira/browse/FLINK-8462 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{TaskExecutor}} does neither properly stop RM heartbeats nor does it > check whether a RM heartbeat timeout is still valid. As a consequence, it can > happen that the {{TaskExecutor}} closes the connection to an active {{RM}} > due to an outdated heartbeat timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8463) Remove unnecessary thread blocking in RestClient#submitRequest
[ https://issues.apache.org/jira/browse/FLINK-8463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334991#comment-16334991 ] ASF GitHub Bot commented on FLINK-8463: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5319 > Remove unnecessary thread blocking in RestClient#submitRequest > -- > > Key: FLINK-8463 > URL: https://issues.apache.org/jira/browse/FLINK-8463 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{RestClient}} unnecessarily blocks an IO executor thread when trying to > open a connection to a remote destination. This can be improved by > registering a {{ChannelFuture}} listener which continues the execution once > the connection has been established. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8462) TaskExecutor does not verify RM heartbeat timeouts
[ https://issues.apache.org/jira/browse/FLINK-8462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334990#comment-16334990 ] ASF GitHub Bot commented on FLINK-8462: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5318 > TaskExecutor does not verify RM heartbeat timeouts > -- > > Key: FLINK-8462 > URL: https://issues.apache.org/jira/browse/FLINK-8462 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{TaskExecutor}} does neither properly stop RM heartbeats nor does it > check whether a RM heartbeat timeout is still valid. As a consequence, it can > happen that the {{TaskExecutor}} closes the connection to an active {{RM}} > due to an outdated heartbeat timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5319: [FLINK-8463] [rest] Remove blocking of IO executor...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5319 ---
[GitHub] flink pull request #5318: [FLINK-8462] [flip6] Filter invalid heartbeat time...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5318 ---
[jira] [Commented] (FLINK-8267) update Kinesis Producer example for setting Region key
[ https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334963#comment-16334963 ] ASF GitHub Bot commented on FLINK-8267: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5301#discussion_r163075196 --- Diff: docs/dev/connectors/kinesis.md --- @@ -271,9 +271,9 @@ For the monitoring to work, the user accessing the stream needs access to the Cl {% highlight java %} Properties producerConfig = new Properties(); // Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put("Region", "us-east-1"); --- End diff -- no, KPL doesn't have such configs. KPL takes a string like 'Region' and tries to find its setter using reflection. > update Kinesis Producer example for setting Region key > -- > > Key: FLINK-8267 > URL: https://issues.apache.org/jira/browse/FLINK-8267 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Dyana Rose >Assignee: Bowen Li >Priority: Minor > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer > In the example code for the kinesis producer the region key is set like: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > {code} > However, the AWS Kinesis Producer Library requires that the region key be > Region > (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269) > so the setting at this point should be: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > producerConfig.put("Region", "us-east-1"); > {code} > When you run the Kinesis Producer you can see the effect of not setting the > Region key by a log line > {noformat} > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Started > Kinesis producer instance for region '' > {noformat} > The KPL also then assumes it's running on EC2 and attempts to determine it's > own region, which fails. > {noformat} > (EC2MetadataClient)Http request to Ec2MetadataService failed. > [error] [main.cc:266] Could not configure the region. It was not given in the > config and we were unable to retrieve it from EC2 metadata > {noformat} > At the least I'd say the documentation should mention the difference between > these two keys and when you are required to also set the Region key. > On the other hand, is this even the intended behaviour of this connector? Was > it intended that the AWSConfigConstants.AWS_REGION key also set the region of > the of the kinesis stream? The documentation for the example states > {noformat} > The example demonstrates producing a single Kinesis stream in the AWS region > “us-east-1”. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5301: [FLINK-8267] [Kinesis Connector] update Kinesis Pr...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5301#discussion_r163075196 --- Diff: docs/dev/connectors/kinesis.md --- @@ -271,9 +271,9 @@ For the monitoring to work, the user accessing the stream needs access to the Cl {% highlight java %} Properties producerConfig = new Properties(); // Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put("Region", "us-east-1"); --- End diff -- no, KPL doesn't have such configs. KPL takes a string like 'Region' and tries to find its setter using reflection. ---
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334945#comment-16334945 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163061338 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable + +/** + * Describes a schema of a table. + */ +class Schema extends Descriptor { + + private val tableSchema: mutable.LinkedHashMap[String, String] = + mutable.LinkedHashMap[String, String]() + + /** +* Sets the schema with field names and the types. Required. +* +* This method overwrites existing fields added with [[field()]]. +* +* @param schema the table schema +*/ + def schema(schema: TableSchema): Schema = { --- End diff -- add a method `def schema(schema: String): Schema` that parses the schema string? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334946#comment-16334946 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163012442 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException} +import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService} + +/** + * Descriptor for specifying a table source in a streaming environment. + */ +class StreamTableSourceDescriptor( +tableEnv: StreamTableEnvironment, +schema: Schema) + extends TableSourceDescriptor { + + schemaDescriptor = Some(schema) + + /** +* Searches for the specified table source, configures it accordingly, and returns it. +*/ + def toTableSource: TableSource[_] = { +val source = TableSourceFactoryService.findTableSourceFactory(this) +source match { + case _: StreamTableSource[_] => source + case _ => throw new TableException( +s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a streaming environment.") +} + } + + /** +* Searches for the specified table source, configures it accordingly, and returns it as a table. +*/ + def toTable: Table = { +tableEnv.fromTableSource(toTableSource) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + def register(name: String): Unit = { +tableEnv.registerTableSource(name, toTableSource) + } + + /** +* Specifies an connector for reading data from a connector. +*/ + def withConnector(connector: ConnectorDescriptor): StreamTableSourceDescriptor = { +connectorDescriptor = Some(connector) +this + } + + /** +* Specifies an encoding that defines how to read data from a connector. +*/ + def withEncoding(encoding: EncodingDescriptor): StreamTableSourceDescriptor = { +encodingDescriptor = Some(encoding) --- End diff -- check if the connector requires an encoding? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334949#comment-16334949 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163071117 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/DescriptorsTest.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import _root_.java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} +import org.apache.flink.table.utils.TableTestBase +import org.junit.Assert.assertEquals +import org.junit.Test + +class DescriptorsTest extends TableTestBase { --- End diff -- I would move the tests to a separate class per descriptor. If we add a `validate` method to `Descriptor` this needs to be tested as well. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334952#comment-16334952 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163012548 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException} +import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService} + +/** + * Descriptor for specifying a table source in a streaming environment. + */ +class StreamTableSourceDescriptor( +tableEnv: StreamTableEnvironment, +schema: Schema) + extends TableSourceDescriptor { + + schemaDescriptor = Some(schema) + + /** +* Searches for the specified table source, configures it accordingly, and returns it. +*/ + def toTableSource: TableSource[_] = { +val source = TableSourceFactoryService.findTableSourceFactory(this) +source match { + case _: StreamTableSource[_] => source + case _ => throw new TableException( +s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a streaming environment.") +} + } + + /** +* Searches for the specified table source, configures it accordingly, and returns it as a table. +*/ + def toTable: Table = { +tableEnv.fromTableSource(toTableSource) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + def register(name: String): Unit = { +tableEnv.registerTableSource(name, toTableSource) + } + + /** +* Specifies an connector for reading data from a connector. +*/ + def withConnector(connector: ConnectorDescriptor): StreamTableSourceDescriptor = { +connectorDescriptor = Some(connector) --- End diff -- check if an encoding was added that the connector does not need? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discuss
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334933#comment-16334933 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162988639 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -23,22 +23,74 @@ import java.net.URL import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.table.annotation.TableType -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException} +import org.apache.flink.table.api._ import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService} import org.apache.flink.table.util.Logging import org.apache.flink.util.InstantiationUtil import org.reflections.Reflections -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** * The utility class is used to convert ExternalCatalogTable to TableSourceTable. */ object ExternalTableSourceUtil extends Logging { + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceTable[_] = { + +// check for the legacy external catalog path +if (externalCatalogTable.isLegacyTableType) { + LOG.warn("External catalog tables based on TableType annotations are deprecated. " + +"Please consider updating them to TableSourceFactories.") + fromExternalCatalogTableType(externalCatalogTable) +} +// use the factory approach +else { + val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable) + tableEnv match { +// check for a batch table source in this batch environment +case _: BatchTableEnvironment => + source match { +case bts: BatchTableSource[_] => + new BatchTableSourceTable( +bts, +new FlinkStatistic(externalCatalogTable.getTableStats)) +case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + +s"in a batch environment.") + } +// check for a stream table source in this streaming environment +case _: StreamTableEnvironment => + source match { +case sts: StreamTableSource[_] => + new StreamTableSourceTable( +sts, +new FlinkStatistic(externalCatalogTable.getTableStats)) +case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + +s"in a streaming environment.") + } +case _ => throw new TableException("Unsupported table environment.") + } +} + } + + // -- + // NOTE: the following line can be removed once we drop support for TableType --- End diff -- line or lines? Create a JIRA and link it here as reference? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tail
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334927#comment-16334927 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162964475 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -136,12 +136,51 @@ case class CatalogAlreadyExistException( def this(catalog: String) = this(catalog, null) } +/** + * Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the + * given properties. + * + * @param properties properties that describe the table source + * @param cause the cause + */ +case class NoMatchingTableSourceException( +properties: Map[String, String], +cause: Throwable) +extends RuntimeException( + s"Could not find a table source factory in the classpath satisfying the " + +s"following properties: \n${properties.map(e => e._1 + "=" + e._2 ).mkString("\n")}", + cause) { + + def this(properties: Map[String, String]) = this(properties, null) +} + +/** + * Exception for finding more than one [[org.apache.flink.table.sources.TableSourceFactory]] for + * the given properties. + * + * @param properties properties that describe the table source + * @param cause the cause + */ +case class AmbiguousTableSourceException( +properties: Map[String, String], +cause: Throwable) +extends RuntimeException( + s"More than one table source factory in the classpath satisfying the " + +s"following properties: \n${properties.map(e => e._1 + "=" + e._2 ).mkString("\n")}", + cause) { + + def this(properties: Map[String, String]) = this(properties, null) +} + /** * Exception for not finding a [[TableSourceConverter]] for a given table type. * * @param tableType table type * @param cause the cause + * @deprecated Use table source factories instead. */ +@Deprecated +@deprecated("Use table factories instead.") --- End diff -- Give a class name. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334953#comment-16334953 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163012882 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException} +import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceFactoryService} + +class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, schema: Schema) --- End diff -- Add `RowtimeDescriptor`. Batch table sources support timestamp extraction as well. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334930#comment-16334930 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162965980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -23,6 +23,7 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.table.api._ import org.apache.flink.table.functions.{AggregateFunction, TableFunction} +import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, ConnectorDescriptor} --- End diff -- remove > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334941#comment-16334941 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163010702 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * A class that adds a set of string-based, normalized properties for describing a + * table source or table sink. + */ +abstract class Descriptor { + + /** +* Internal method for properties conversion. +*/ + def addProperties(properties: NormalizedProperties): Unit --- End diff -- does this method have to be public or can we hide it? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334937#comment-16334937 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r16231 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorUtils.statistics +import org.apache.flink.table.plan.stats.TableStats + +import scala.collection.JavaConverters._ + +/** + * Common class for all descriptors describing a table source. + */ +abstract class TableSourceDescriptor extends Descriptor { + + protected var schemaDescriptor: Option[Schema] = None + protected var connectorDescriptor: Option[ConnectorDescriptor] = None + protected var encodingDescriptor: Option[EncodingDescriptor] = None + protected var proctimeDescriptor: Option[Proctime] = None + protected var rowtimeDescriptor: Option[Rowtime] = None + protected var statisticsDescriptor: Option[Statistics] = None + protected var metaDescriptor: Option[Metadata] = None + --- End diff -- We might need another descriptor for mapping fields of the encoding (or connector) to fields in the table schema. This can be used to rename or select fields from the encoding to the table schema. This would be the configuration for the `DefinedFieldMapping` interface. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334942#comment-16334942 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163007886 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +/** + * Encoding descriptor for JSON. + */ +class JSON extends EncodingDescriptor("json") { + + private val encodingSchema: mutable.LinkedHashMap[String, String] = + mutable.LinkedHashMap[String, String]() + private var fieldMapping: Option[util.Map[String, String]] = None + private var failOnMissingField: Option[Boolean] = None + + /** +* Sets the JSON schema with field names and the types for the JSON-encoded input. +* The JSON schema must not contain nested fields. +* +* This method overwrites existing fields added with [[field()]]. +* +* @param schema the table schema +*/ + def schema(schema: TableSchema): JSON = { +this.encodingSchema.clear() +NormalizedProperties.normalizeTableSchema(schema).foreach { + case (n, t) => field(n, t) +} +this + } + + /** +* Adds a JSON field with the field name and the type information for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type information of the field +*/ + def field(fieldName: String, fieldType: TypeInformation[_]): JSON = { +field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType)) +this + } + + /** +* Adds a JSON field with the field name and the type string for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type string of the field +*/ + def field(fieldName: String, fieldType: String): JSON = { +if (encodingSchema.contains(fieldName)) { + throw new IllegalArgumentException(s"Duplicate field name $fieldName.") +} +encodingSchema += (fieldName -> fieldType) +this + } + + /** +* Sets a mapping from schema fields to fields of the JSON schema. +* +* A field mapping is required if the fields of produced tables should be named different than +* the fields of the JSON records. +* The key of the provided Map refers to the field of the table schema, +* the value to the field in the JSON schema. +* +* @param tableToJsonMapping A mapping from table schema fields to JSON schema fields. +* @return The builder. +*/ + def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = { +this.fieldMapping = Some(tableToJsonMapping) +this + } + + /** +* Sets flag whether to fail if a field is missing or not. +* +* @param failOnMissingField If set to true, the operation fails if there is a missing field. +* If set to false, a missing field is set to null. +* @return The builder. +*/ + def failOnMissingField(failOnMissingField: Boolean): JSON = { +this.failOnMi
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334944#comment-16334944 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163011304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * A class that adds a set of string-based, normalized properties for describing a + * table source or table sink. + */ +abstract class Descriptor { --- End diff -- Should we add a validation method that checks if the descriptor is valid? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334939#comment-16334939 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163002001 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +/** + * Encoding descriptor for JSON. + */ +class JSON extends EncodingDescriptor("json") { + + private val encodingSchema: mutable.LinkedHashMap[String, String] = + mutable.LinkedHashMap[String, String]() + private var fieldMapping: Option[util.Map[String, String]] = None + private var failOnMissingField: Option[Boolean] = None + + /** +* Sets the JSON schema with field names and the types for the JSON-encoded input. +* The JSON schema must not contain nested fields. +* +* This method overwrites existing fields added with [[field()]]. +* +* @param schema the table schema +*/ + def schema(schema: TableSchema): JSON = { +this.encodingSchema.clear() +NormalizedProperties.normalizeTableSchema(schema).foreach { + case (n, t) => field(n, t) +} +this + } + + /** +* Adds a JSON field with the field name and the type information for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type information of the field +*/ + def field(fieldName: String, fieldType: TypeInformation[_]): JSON = { +field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType)) +this + } + + /** +* Adds a JSON field with the field name and the type string for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type string of the field +*/ + def field(fieldName: String, fieldType: String): JSON = { +if (encodingSchema.contains(fieldName)) { + throw new IllegalArgumentException(s"Duplicate field name $fieldName.") +} +encodingSchema += (fieldName -> fieldType) +this + } + + /** +* Sets a mapping from schema fields to fields of the JSON schema. +* +* A field mapping is required if the fields of produced tables should be named different than +* the fields of the JSON records. +* The key of the provided Map refers to the field of the table schema, +* the value to the field in the JSON schema. +* +* @param tableToJsonMapping A mapping from table schema fields to JSON schema fields. +* @return The builder. +*/ + def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = { --- End diff -- We might want to make field mappings independent of the encoding. For example field mappings could also be used for JDBC connectors which do not have an encoding. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334951#comment-16334951 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163071426 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils + +import java.io.Serializable + +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo +import org.apache.flink.util.InstantiationUtil + +import _root_.scala.language.implicitConversions +import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers} + +/** + * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a + * string representation and back. + */ +object TypeStringUtils extends JavaTokenParsers with PackratParsers { --- End diff -- We need unit tests for the parser. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334932#comment-16334932 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162967648 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,28 +18,282 @@ package org.apache.flink.table.catalog -import java.util.{HashMap => JHashMap, Map => JMap} import java.lang.{Long => JLong} +import java.util.{HashMap => JHashMap, Map => JMap} -import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.api.{TableException, TableSchema} +import org.apache.flink.table.catalog.ExternalCatalogTable.{TableTypeConnector, toConnectorDescriptor, toMetadataDescriptor, toStatisticsDescriptor} +import org.apache.flink.table.descriptors.DescriptorUtils.{connector, metadata} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** * Defines a table in an [[ExternalCatalog]]. - * - * @param tableTypeTable type, e.g csv, hbase, kafka - * @param schema Schema of the table (column names and types) - * @param properties Properties of the table - * @param statsStatistics of the table - * @param comment Comment of the table - * @param createTime Create timestamp of the table - * @param lastAccessTime Timestamp of last access of the table */ -case class ExternalCatalogTable( +class ExternalCatalogTable( --- End diff -- Add descriptions for constructor arguments > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334948#comment-16334948 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163011691 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var rowtimeName: Option[String] = None + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Declares a field of the schema to be the rowtime attribute. Required. +* +* @param fieldName The name of the field that becomes the processing time field. +*/ + def field(fieldName: String): Rowtime = { +rowtimeName = Some(fieldName) +this + } + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { +watermarkStrategy = Some(new AscendingTimestamps) +this + } + + /** +* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded +* time interval. +* +* Emits watermarks which are the maximum observed timestamp minus the specified delay. +*/ + def watermarkPeriodicBounding(delay: Long): Rowtime = { +watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay)) +this + } + + /** +* Sets a built-in watermark strategy which indicates the watermarks should be preserved from the +* underlying DataStream API. +*/ + def watermarkFromDataStream(): Rowtime = { +watermarkStrategy = Some(PreserveWatermarks.INSTANCE) +thi
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334950#comment-16334950 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163067483 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils + +import java.io.Serializable + +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo +import org.apache.flink.util.InstantiationUtil + +import _root_.scala.language.implicitConversions +import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers} + +/** + * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a + * string representation and back. + */ +object TypeStringUtils extends JavaTokenParsers with PackratParsers { --- End diff -- Some examples about the supported syntax would be good. Would also be good to add these examples to the method docs that accept type strings. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334947#comment-16334947 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163065450 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +/** + * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider + * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that + * describe the desired table source. The factory allows for matching to the given set of + * properties and creating a configured [[TableSource]] accordingly. + * + * Classes that implement this interface need to be added to the + * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in --- End diff -- do all need to be added to the same file? Or can we have separate files for different modules. For instance, a `Kafka011JsonTableFactory` would be in the Kafka connectors module. Would a user have to change the service file if the Kafka factory should be used or can we built it in a way that it is sufficient include the Kafka connectors JAR? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334943#comment-16334943 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162995916 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorUtils.connector + +/** + * Describes a connector to an other system. + * + * @param tpe string identifier for the connector + */ +abstract class ConnectorDescriptor(private val tpe: String) extends Descriptor { + + /** +* Internal method for properties conversion. +*/ + final def addProperties(properties: NormalizedProperties): Unit = { +properties.putString(connector("type"), tpe) +val connectorProperties = new NormalizedProperties() +addConnectorProperties(connectorProperties) +connectorProperties.getProperties.foreach { case (k, v) => --- End diff -- why do we need to go over the properties again? Couldn't we implement `addConnectorProperties` to properly add the properties directly into `properties`? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334940#comment-16334940 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162996455 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +/** + * Utilities for working with a [[org.apache.flink.table.descriptors.Descriptor]]. + */ +object DescriptorUtils { + + def hasConnector(properties: util.Map[String, String], connector: String): Boolean = { +val tpe = properties.get("connector.type") +tpe != null || tpe == connector --- End diff -- should be `tpe != null && tpe == connector`? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334938#comment-16334938 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163005744 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +/** + * Encoding descriptor for JSON. + */ +class JSON extends EncodingDescriptor("json") { --- End diff -- Should we add a method that defines the schema with a JSON Schema string? We would need a parser, but have immediate support for nested schema. Alternatively, we could use the nested schema parser of `TypeStringUtils` but this would not be JSON Schema. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334936#comment-16334936 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163007563 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +/** + * Encoding descriptor for JSON. + */ +class JSON extends EncodingDescriptor("json") { + + private val encodingSchema: mutable.LinkedHashMap[String, String] = + mutable.LinkedHashMap[String, String]() + private var fieldMapping: Option[util.Map[String, String]] = None + private var failOnMissingField: Option[Boolean] = None + + /** +* Sets the JSON schema with field names and the types for the JSON-encoded input. +* The JSON schema must not contain nested fields. +* +* This method overwrites existing fields added with [[field()]]. +* +* @param schema the table schema +*/ + def schema(schema: TableSchema): JSON = { +this.encodingSchema.clear() +NormalizedProperties.normalizeTableSchema(schema).foreach { + case (n, t) => field(n, t) +} +this + } + + /** +* Adds a JSON field with the field name and the type information for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type information of the field +*/ + def field(fieldName: String, fieldType: TypeInformation[_]): JSON = { +field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType)) +this + } + + /** +* Adds a JSON field with the field name and the type string for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type string of the field +*/ + def field(fieldName: String, fieldType: String): JSON = { +if (encodingSchema.contains(fieldName)) { + throw new IllegalArgumentException(s"Duplicate field name $fieldName.") +} +encodingSchema += (fieldName -> fieldType) +this + } + + /** +* Sets a mapping from schema fields to fields of the JSON schema. +* +* A field mapping is required if the fields of produced tables should be named different than +* the fields of the JSON records. +* The key of the provided Map refers to the field of the table schema, +* the value to the field in the JSON schema. +* +* @param tableToJsonMapping A mapping from table schema fields to JSON schema fields. +* @return The builder. +*/ + def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = { +this.fieldMapping = Some(tableToJsonMapping) +this + } + + /** +* Sets flag whether to fail if a field is missing or not. +* +* @param failOnMissingField If set to true, the operation fails if there is a missing field. +* If set to false, a missing field is set to null. +* @return The builder. +*/ + def failOnMissingField(failOnMissingField: Boolean): JSON = { +this.failOnMi
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334931#comment-16334931 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162990874 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -23,22 +23,74 @@ import java.net.URL import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.table.annotation.TableType -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException} +import org.apache.flink.table.api._ import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService} import org.apache.flink.table.util.Logging import org.apache.flink.util.InstantiationUtil import org.reflections.Reflections -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** * The utility class is used to convert ExternalCatalogTable to TableSourceTable. */ object ExternalTableSourceUtil extends Logging { + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceTable[_] = { + +// check for the legacy external catalog path +if (externalCatalogTable.isLegacyTableType) { + LOG.warn("External catalog tables based on TableType annotations are deprecated. " + +"Please consider updating them to TableSourceFactories.") + fromExternalCatalogTableType(externalCatalogTable) +} +// use the factory approach +else { + val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable) + tableEnv match { +// check for a batch table source in this batch environment +case _: BatchTableEnvironment => + source match { +case bts: BatchTableSource[_] => + new BatchTableSourceTable( +bts, +new FlinkStatistic(externalCatalogTable.getTableStats)) +case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + +s"in a batch environment.") + } +// check for a stream table source in this streaming environment +case _: StreamTableEnvironment => + source match { +case sts: StreamTableSource[_] => + new StreamTableSourceTable( +sts, +new FlinkStatistic(externalCatalogTable.getTableStats)) +case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + +s"in a streaming environment.") + } +case _ => throw new TableException("Unsupported table environment.") + } +} + } + + // -- + // NOTE: the following line can be removed once we drop support for TableType --- End diff -- I think we can also remove the `org.reflections:reflections` dependency once we removed this. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiati
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162947812 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -107,6 +111,16 @@ abstract class BatchTableEnvironment( } } + /** +* Creates a table from a descriptor that describes the resulting table schema, the source +* connector, source encoding, and other properties. +* +* @param schema schema descriptor describing the table to create +*/ + def createTable(schema: Schema): BatchTableSourceDescriptor = { --- End diff -- I'm not sure about the approach of returning a `TableSourceDescriptor`. I think it would be better if the table creation and registration would be completed within this method, i.e., the table should be completely defined by the argument of the method. For example ``` tEnv.registerTableSource( "MyTable", TableSource.create(tEnv) .withSchema( Schema() .field(...) .field(...)) .withConnector() ... .toTableSource() ) ``` In this design, we would reuse existing `registerTableSource` method and `TableSource.create` is a static method that returns a `TableSourceDescriptor`. Not sure if this is the best approach, but I like that the table is completely defined within the method call. ---
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334934#comment-16334934 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162996633 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +/** + * Utilities for working with a [[org.apache.flink.table.descriptors.Descriptor]]. + */ +object DescriptorUtils { + + def hasConnector(properties: util.Map[String, String], connector: String): Boolean = { +val tpe = properties.get("connector.type") +tpe != null || tpe == connector + } + + def hasEncoding(properties: util.Map[String, String], encoding: String): Boolean = { +val tpe = properties.get("encoding.type") +tpe != null || tpe == encoding --- End diff -- should be `tpe != null && tpe == encoding`? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162964487 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -156,7 +195,10 @@ case class NoMatchedTableSourceConverterException( * * @param tableType table type * @param cause the cause + * @deprecated Use table source factories instead. */ +@Deprecated +@deprecated("Use table source factories instead.") --- End diff -- Give a class name. ---
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334928#comment-16334928 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162947784 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -125,6 +129,16 @@ abstract class StreamTableEnvironment( } } + /** +* Creates a table from a descriptor that describes the resulting table schema, the source +* connector, the source encoding, and other properties. +* +* @param schema schema descriptor describing the table to create +*/ + def createTable(schema: Schema): StreamTableSourceDescriptor = { --- End diff -- See comment on `BatchTableEnvironment` > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162988639 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -23,22 +23,74 @@ import java.net.URL import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.table.annotation.TableType -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException} +import org.apache.flink.table.api._ import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService} import org.apache.flink.table.util.Logging import org.apache.flink.util.InstantiationUtil import org.reflections.Reflections -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** * The utility class is used to convert ExternalCatalogTable to TableSourceTable. */ object ExternalTableSourceUtil extends Logging { + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceTable[_] = { + +// check for the legacy external catalog path +if (externalCatalogTable.isLegacyTableType) { + LOG.warn("External catalog tables based on TableType annotations are deprecated. " + +"Please consider updating them to TableSourceFactories.") + fromExternalCatalogTableType(externalCatalogTable) +} +// use the factory approach +else { + val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable) + tableEnv match { +// check for a batch table source in this batch environment +case _: BatchTableEnvironment => + source match { +case bts: BatchTableSource[_] => + new BatchTableSourceTable( +bts, +new FlinkStatistic(externalCatalogTable.getTableStats)) +case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + +s"in a batch environment.") + } +// check for a stream table source in this streaming environment +case _: StreamTableEnvironment => + source match { +case sts: StreamTableSource[_] => + new StreamTableSourceTable( +sts, +new FlinkStatistic(externalCatalogTable.getTableStats)) +case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + +s"in a streaming environment.") + } +case _ => throw new TableException("Unsupported table environment.") + } +} + } + + // -- + // NOTE: the following line can be removed once we drop support for TableType --- End diff -- line or lines? Create a JIRA and link it here as reference? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163012548 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException} +import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService} + +/** + * Descriptor for specifying a table source in a streaming environment. + */ +class StreamTableSourceDescriptor( +tableEnv: StreamTableEnvironment, +schema: Schema) + extends TableSourceDescriptor { + + schemaDescriptor = Some(schema) + + /** +* Searches for the specified table source, configures it accordingly, and returns it. +*/ + def toTableSource: TableSource[_] = { +val source = TableSourceFactoryService.findTableSourceFactory(this) +source match { + case _: StreamTableSource[_] => source + case _ => throw new TableException( +s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a streaming environment.") +} + } + + /** +* Searches for the specified table source, configures it accordingly, and returns it as a table. +*/ + def toTable: Table = { +tableEnv.fromTableSource(toTableSource) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + def register(name: String): Unit = { +tableEnv.registerTableSource(name, toTableSource) + } + + /** +* Specifies an connector for reading data from a connector. +*/ + def withConnector(connector: ConnectorDescriptor): StreamTableSourceDescriptor = { +connectorDescriptor = Some(connector) --- End diff -- check if an encoding was added that the connector does not need? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162995916 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorUtils.connector + +/** + * Describes a connector to an other system. + * + * @param tpe string identifier for the connector + */ +abstract class ConnectorDescriptor(private val tpe: String) extends Descriptor { + + /** +* Internal method for properties conversion. +*/ + final def addProperties(properties: NormalizedProperties): Unit = { +properties.putString(connector("type"), tpe) +val connectorProperties = new NormalizedProperties() +addConnectorProperties(connectorProperties) +connectorProperties.getProperties.foreach { case (k, v) => --- End diff -- why do we need to go over the properties again? Couldn't we implement `addConnectorProperties` to properly add the properties directly into `properties`? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162994182 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorUtils.connector + +/** + * Describes a connector to an other system. + * + * @param tpe string identifier for the connector + */ +abstract class ConnectorDescriptor(private val tpe: String) extends Descriptor { --- End diff -- Should a `ConnectorDescriptor` know whether it requires an encoding? For example a file descriptor needs an encoding but a JDBC connector doesn't. This property would then be used to validate the configuration ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163012442 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException} +import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService} + +/** + * Descriptor for specifying a table source in a streaming environment. + */ +class StreamTableSourceDescriptor( +tableEnv: StreamTableEnvironment, +schema: Schema) + extends TableSourceDescriptor { + + schemaDescriptor = Some(schema) + + /** +* Searches for the specified table source, configures it accordingly, and returns it. +*/ + def toTableSource: TableSource[_] = { +val source = TableSourceFactoryService.findTableSourceFactory(this) +source match { + case _: StreamTableSource[_] => source + case _ => throw new TableException( +s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a streaming environment.") +} + } + + /** +* Searches for the specified table source, configures it accordingly, and returns it as a table. +*/ + def toTable: Table = { +tableEnv.fromTableSource(toTableSource) + } + + /** +* Searches for the specified table source, configures it accordingly, and registers it as +* a table under the given name. +* +* @param name table name to be registered in the table environment +*/ + def register(name: String): Unit = { +tableEnv.registerTableSource(name, toTableSource) + } + + /** +* Specifies an connector for reading data from a connector. +*/ + def withConnector(connector: ConnectorDescriptor): StreamTableSourceDescriptor = { +connectorDescriptor = Some(connector) +this + } + + /** +* Specifies an encoding that defines how to read data from a connector. +*/ + def withEncoding(encoding: EncodingDescriptor): StreamTableSourceDescriptor = { +encodingDescriptor = Some(encoding) --- End diff -- check if the connector requires an encoding? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162965980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -23,6 +23,7 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.table.api._ import org.apache.flink.table.functions.{AggregateFunction, TableFunction} +import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, ConnectorDescriptor} --- End diff -- remove ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162967648 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,28 +18,282 @@ package org.apache.flink.table.catalog -import java.util.{HashMap => JHashMap, Map => JMap} import java.lang.{Long => JLong} +import java.util.{HashMap => JHashMap, Map => JMap} -import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.api.{TableException, TableSchema} +import org.apache.flink.table.catalog.ExternalCatalogTable.{TableTypeConnector, toConnectorDescriptor, toMetadataDescriptor, toStatisticsDescriptor} +import org.apache.flink.table.descriptors.DescriptorUtils.{connector, metadata} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** * Defines a table in an [[ExternalCatalog]]. - * - * @param tableTypeTable type, e.g csv, hbase, kafka - * @param schema Schema of the table (column names and types) - * @param properties Properties of the table - * @param statsStatistics of the table - * @param comment Comment of the table - * @param createTime Create timestamp of the table - * @param lastAccessTime Timestamp of last access of the table */ -case class ExternalCatalogTable( +class ExternalCatalogTable( --- End diff -- Add descriptions for constructor arguments ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163007886 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +/** + * Encoding descriptor for JSON. + */ +class JSON extends EncodingDescriptor("json") { + + private val encodingSchema: mutable.LinkedHashMap[String, String] = + mutable.LinkedHashMap[String, String]() + private var fieldMapping: Option[util.Map[String, String]] = None + private var failOnMissingField: Option[Boolean] = None + + /** +* Sets the JSON schema with field names and the types for the JSON-encoded input. +* The JSON schema must not contain nested fields. +* +* This method overwrites existing fields added with [[field()]]. +* +* @param schema the table schema +*/ + def schema(schema: TableSchema): JSON = { +this.encodingSchema.clear() +NormalizedProperties.normalizeTableSchema(schema).foreach { + case (n, t) => field(n, t) +} +this + } + + /** +* Adds a JSON field with the field name and the type information for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type information of the field +*/ + def field(fieldName: String, fieldType: TypeInformation[_]): JSON = { +field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType)) +this + } + + /** +* Adds a JSON field with the field name and the type string for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type string of the field +*/ + def field(fieldName: String, fieldType: String): JSON = { +if (encodingSchema.contains(fieldName)) { + throw new IllegalArgumentException(s"Duplicate field name $fieldName.") +} +encodingSchema += (fieldName -> fieldType) +this + } + + /** +* Sets a mapping from schema fields to fields of the JSON schema. +* +* A field mapping is required if the fields of produced tables should be named different than +* the fields of the JSON records. +* The key of the provided Map refers to the field of the table schema, +* the value to the field in the JSON schema. +* +* @param tableToJsonMapping A mapping from table schema fields to JSON schema fields. +* @return The builder. +*/ + def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = { +this.fieldMapping = Some(tableToJsonMapping) +this + } + + /** +* Sets flag whether to fail if a field is missing or not. +* +* @param failOnMissingField If set to true, the operation fails if there is a missing field. +* If set to false, a missing field is set to null. +* @return The builder. +*/ + def failOnMissingField(failOnMissingField: Boolean): JSON = { +this.failOnMissingField = Some(failOnMissingField) +this + } + + /** +* Internal method for encoding properties conversion. +*/ + override protected def addEncodingProperties(properties: NormalizedProperties): Unit = { +
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163005744 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +/** + * Encoding descriptor for JSON. + */ +class JSON extends EncodingDescriptor("json") { --- End diff -- Should we add a method that defines the schema with a JSON Schema string? We would need a parser, but have immediate support for nested schema. Alternatively, we could use the nested schema parser of `TypeStringUtils` but this would not be JSON Schema. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163067483 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils + +import java.io.Serializable + +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo +import org.apache.flink.util.InstantiationUtil + +import _root_.scala.language.implicitConversions +import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers} + +/** + * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a + * string representation and back. + */ +object TypeStringUtils extends JavaTokenParsers with PackratParsers { --- End diff -- Some examples about the supported syntax would be good. Would also be good to add these examples to the method docs that accept type strings. ---
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334926#comment-16334926 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162964487 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -156,7 +195,10 @@ case class NoMatchedTableSourceConverterException( * * @param tableType table type * @param cause the cause + * @deprecated Use table source factories instead. */ +@Deprecated +@deprecated("Use table source factories instead.") --- End diff -- Give a class name. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334935#comment-16334935 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162994182 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorUtils.connector + +/** + * Describes a connector to an other system. + * + * @param tpe string identifier for the connector + */ +abstract class ConnectorDescriptor(private val tpe: String) extends Descriptor { --- End diff -- Should a `ConnectorDescriptor` know whether it requires an encoding? For example a file descriptor needs an encoding but a JDBC connector doesn't. This property would then be used to validate the configuration > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334929#comment-16334929 ] ASF GitHub Bot commented on FLINK-8240: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162947812 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -107,6 +111,16 @@ abstract class BatchTableEnvironment( } } + /** +* Creates a table from a descriptor that describes the resulting table schema, the source +* connector, source encoding, and other properties. +* +* @param schema schema descriptor describing the table to create +*/ + def createTable(schema: Schema): BatchTableSourceDescriptor = { --- End diff -- I'm not sure about the approach of returning a `TableSourceDescriptor`. I think it would be better if the table creation and registration would be completed within this method, i.e., the table should be completely defined by the argument of the method. For example ``` tEnv.registerTableSource( "MyTable", TableSource.create(tEnv) .withSchema( Schema() .field(...) .field(...)) .withConnector() ... .toTableSource() ) ``` In this design, we would reuse existing `registerTableSource` method and `TableSource.create` is a static method that returns a `TableSourceDescriptor`. Not sure if this is the best approach, but I like that the table is completely defined within the method call. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163065450 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +/** + * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider + * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that + * describe the desired table source. The factory allows for matching to the given set of + * properties and creating a configured [[TableSource]] accordingly. + * + * Classes that implement this interface need to be added to the + * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in --- End diff -- do all need to be added to the same file? Or can we have separate files for different modules. For instance, a `Kafka011JsonTableFactory` would be in the Kafka connectors module. Would a user have to change the service file if the Kafka factory should be used or can we built it in a way that it is sufficient include the Kafka connectors JAR? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163011691 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var rowtimeName: Option[String] = None + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** +* Declares a field of the schema to be the rowtime attribute. Required. +* +* @param fieldName The name of the field that becomes the processing time field. +*/ + def field(fieldName: String): Rowtime = { +rowtimeName = Some(fieldName) +this + } + + /** +* Sets a built-in timestamp extractor that converts an existing [[Long]] or +* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. +* +* @param fieldName The field to convert into a rowtime attribute. +*/ + def timestampFromField(fieldName: String): Rowtime = { +timestampExtractor = Some(new ExistingField(fieldName)) +this + } + + /** +* Sets a built-in timestamp extractor that converts the assigned timestamp from +* a DataStream API record into the rowtime attribute. +* +* Note: This extractor only works in streaming environments. +*/ + def timestampFromDataStream(): Rowtime = { +timestampExtractor = Some(new StreamRecordTimestamp) +this + } + + /** +* Sets a custom timestamp extractor to be used for the rowtime attribute. +* +* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute +* from the physical type. +*/ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { +timestampExtractor = Some(extractor) +this + } + + /** +* Sets a built-in watermark strategy for ascending rowtime attributes. +* +* Emits a watermark of the maximum observed timestamp so far minus 1. +* Rows that have a timestamp equal to the max timestamp are not late. +*/ + def watermarkPeriodicAscending(): Rowtime = { +watermarkStrategy = Some(new AscendingTimestamps) +this + } + + /** +* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded +* time interval. +* +* Emits watermarks which are the maximum observed timestamp minus the specified delay. +*/ + def watermarkPeriodicBounding(delay: Long): Rowtime = { +watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay)) +this + } + + /** +* Sets a built-in watermark strategy which indicates the watermarks should be preserved from the +* underlying DataStream API. +*/ + def watermarkFromDataStream(): Rowtime = { +watermarkStrategy = Some(PreserveWatermarks.INSTANCE) +this + } + + /** +* Sets a custom watermark strategy to be used for the rowtime attribute. +*/ + def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = { +watermarkStrategy = Some(strategy) +this
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163011304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * A class that adds a set of string-based, normalized properties for describing a + * table source or table sink. + */ +abstract class Descriptor { --- End diff -- Should we add a validation method that checks if the descriptor is valid? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163071426 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils + +import java.io.Serializable + +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo +import org.apache.flink.util.InstantiationUtil + +import _root_.scala.language.implicitConversions +import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers} + +/** + * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a + * string representation and back. + */ +object TypeStringUtils extends JavaTokenParsers with PackratParsers { --- End diff -- We need unit tests for the parser. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163012882 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException} +import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceFactoryService} + +class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, schema: Schema) --- End diff -- Add `RowtimeDescriptor`. Batch table sources support timestamp extraction as well. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163061338 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable + +/** + * Describes a schema of a table. + */ +class Schema extends Descriptor { + + private val tableSchema: mutable.LinkedHashMap[String, String] = + mutable.LinkedHashMap[String, String]() + + /** +* Sets the schema with field names and the types. Required. +* +* This method overwrites existing fields added with [[field()]]. +* +* @param schema the table schema +*/ + def schema(schema: TableSchema): Schema = { --- End diff -- add a method `def schema(schema: String): Schema` that parses the schema string? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163071117 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/DescriptorsTest.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import _root_.java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} +import org.apache.flink.table.utils.TableTestBase +import org.junit.Assert.assertEquals +import org.junit.Test + +class DescriptorsTest extends TableTestBase { --- End diff -- I would move the tests to a separate class per descriptor. If we add a `validate` method to `Descriptor` this needs to be tested as well. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162947784 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -125,6 +129,16 @@ abstract class StreamTableEnvironment( } } + /** +* Creates a table from a descriptor that describes the resulting table schema, the source +* connector, the source encoding, and other properties. +* +* @param schema schema descriptor describing the table to create +*/ + def createTable(schema: Schema): StreamTableSourceDescriptor = { --- End diff -- See comment on `BatchTableEnvironment` ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r163002001 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +/** + * Encoding descriptor for JSON. + */ +class JSON extends EncodingDescriptor("json") { + + private val encodingSchema: mutable.LinkedHashMap[String, String] = + mutable.LinkedHashMap[String, String]() + private var fieldMapping: Option[util.Map[String, String]] = None + private var failOnMissingField: Option[Boolean] = None + + /** +* Sets the JSON schema with field names and the types for the JSON-encoded input. +* The JSON schema must not contain nested fields. +* +* This method overwrites existing fields added with [[field()]]. +* +* @param schema the table schema +*/ + def schema(schema: TableSchema): JSON = { +this.encodingSchema.clear() +NormalizedProperties.normalizeTableSchema(schema).foreach { + case (n, t) => field(n, t) +} +this + } + + /** +* Adds a JSON field with the field name and the type information for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type information of the field +*/ + def field(fieldName: String, fieldType: TypeInformation[_]): JSON = { +field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType)) +this + } + + /** +* Adds a JSON field with the field name and the type string for the JSON-encoding. +* This method can be called multiple times. The call order of this method defines +* also the order of the fields in the JSON-encoding. +* +* @param fieldName the field name +* @param fieldType the type string of the field +*/ + def field(fieldName: String, fieldType: String): JSON = { +if (encodingSchema.contains(fieldName)) { + throw new IllegalArgumentException(s"Duplicate field name $fieldName.") +} +encodingSchema += (fieldName -> fieldType) +this + } + + /** +* Sets a mapping from schema fields to fields of the JSON schema. +* +* A field mapping is required if the fields of produced tables should be named different than +* the fields of the JSON records. +* The key of the provided Map refers to the field of the table schema, +* the value to the field in the JSON schema. +* +* @param tableToJsonMapping A mapping from table schema fields to JSON schema fields. +* @return The builder. +*/ + def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = { --- End diff -- We might want to make field mappings independent of the encoding. For example field mappings could also be used for JDBC connectors which do not have an encoding. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162996633 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +/** + * Utilities for working with a [[org.apache.flink.table.descriptors.Descriptor]]. + */ +object DescriptorUtils { + + def hasConnector(properties: util.Map[String, String], connector: String): Boolean = { +val tpe = properties.get("connector.type") +tpe != null || tpe == connector + } + + def hasEncoding(properties: util.Map[String, String], encoding: String): Boolean = { +val tpe = properties.get("encoding.type") +tpe != null || tpe == encoding --- End diff -- should be `tpe != null && tpe == encoding`? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r16231 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorUtils.statistics +import org.apache.flink.table.plan.stats.TableStats + +import scala.collection.JavaConverters._ + +/** + * Common class for all descriptors describing a table source. + */ +abstract class TableSourceDescriptor extends Descriptor { + + protected var schemaDescriptor: Option[Schema] = None + protected var connectorDescriptor: Option[ConnectorDescriptor] = None + protected var encodingDescriptor: Option[EncodingDescriptor] = None + protected var proctimeDescriptor: Option[Proctime] = None + protected var rowtimeDescriptor: Option[Rowtime] = None + protected var statisticsDescriptor: Option[Statistics] = None + protected var metaDescriptor: Option[Metadata] = None + --- End diff -- We might need another descriptor for mapping fields of the encoding (or connector) to fields in the table schema. This can be used to rename or select fields from the encoding to the table schema. This would be the configuration for the `DefinedFieldMapping` interface. ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162996455 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +/** + * Utilities for working with a [[org.apache.flink.table.descriptors.Descriptor]]. + */ +object DescriptorUtils { + + def hasConnector(properties: util.Map[String, String], connector: String): Boolean = { +val tpe = properties.get("connector.type") +tpe != null || tpe == connector --- End diff -- should be `tpe != null && tpe == connector`? ---
[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162990874 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -23,22 +23,74 @@ import java.net.URL import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.table.annotation.TableType -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException} +import org.apache.flink.table.api._ import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService} import org.apache.flink.table.util.Logging import org.apache.flink.util.InstantiationUtil import org.reflections.Reflections -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** * The utility class is used to convert ExternalCatalogTable to TableSourceTable. */ object ExternalTableSourceUtil extends Logging { + /** +* Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance +* +* @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert +* @return converted [[TableSourceTable]] instance from the input catalog table +*/ + def fromExternalCatalogTable( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) +: TableSourceTable[_] = { + +// check for the legacy external catalog path +if (externalCatalogTable.isLegacyTableType) { + LOG.warn("External catalog tables based on TableType annotations are deprecated. " + +"Please consider updating them to TableSourceFactories.") + fromExternalCatalogTableType(externalCatalogTable) +} +// use the factory approach +else { + val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable) + tableEnv match { +// check for a batch table source in this batch environment +case _: BatchTableEnvironment => + source match { +case bts: BatchTableSource[_] => + new BatchTableSourceTable( +bts, +new FlinkStatistic(externalCatalogTable.getTableStats)) +case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + +s"in a batch environment.") + } +// check for a stream table source in this streaming environment +case _: StreamTableEnvironment => + source match { +case sts: StreamTableSource[_] => + new StreamTableSourceTable( +sts, +new FlinkStatistic(externalCatalogTable.getTableStats)) +case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + +s"in a streaming environment.") + } +case _ => throw new TableException("Unsupported table environment.") + } +} + } + + // -- + // NOTE: the following line can be removed once we drop support for TableType --- End diff -- I think we can also remove the `org.reflections:reflections` dependency once we removed this. ---