[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375378#comment-16375378 ] ASF GitHub Bot commented on FLINK-8756: --- GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5573 [FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient ## What is the purpose of the change This PR Support ClusterClient.getAccumulators() in RestClusterClient. ## Brief change log - *Send REST request to get `JobAccumulatorsInfo` object* - *Use jackson's ObjectMapper convert `JobAccumulatorsInfo` object to Map* - *Add a test method into `RestClusterClientTest` class to test the `getAccumulators` function* - *Add a test handler to mock `JobAccumulatorsInfo` object* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates that actual accumulators size equals we mocked in the test handler* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-8756 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5573.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5573 commit ec8ef5d8ad6d650e250737d5005173994337168c Author: vinoyangDate: 2018-02-24T06:50:55Z [FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient > Support ClusterClient.getAccumulators() in RestClusterClient > > > Key: FLINK-8756 > URL: https://issues.apache.org/jira/browse/FLINK-8756 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: vinoyang >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5573 [FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient ## What is the purpose of the change This PR Support ClusterClient.getAccumulators() in RestClusterClient. ## Brief change log - *Send REST request to get `JobAccumulatorsInfo` object* - *Use jackson's ObjectMapper convert `JobAccumulatorsInfo` object to Map* - *Add a test method into `RestClusterClientTest` class to test the `getAccumulators` function* - *Add a test handler to mock `JobAccumulatorsInfo` object* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates that actual accumulators size equals we mocked in the test handler* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-8756 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5573.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5573 commit ec8ef5d8ad6d650e250737d5005173994337168c Author: vinoyangDate: 2018-02-24T06:50:55Z [FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient ---
[jira] [Commented] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail
[ https://issues.apache.org/jira/browse/FLINK-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375201#comment-16375201 ] vinoyang commented on FLINK-7641: - yes [~elevy] you are right, our implementation go further than your description, we do not need pause running jobs, the running jobs are still running when the master JM failover. > Loss of JobManager in HA mode should not cause jobs to fail > --- > > Key: FLINK-7641 > URL: https://issues.apache.org/jira/browse/FLINK-7641 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.3.2 >Reporter: Elias Levy >Assignee: vinoyang >Priority: Major > > Currently if a standalone cluster of JobManagers is configured in > high-availability mode and the master JM is lost, the job executing in the > cluster will be restarted. This is less than ideal. It would be best if the > jobs could continue to execute without restarting while one of the spare JMs > becomes the new master, or in the worse case, the jobs are paused while the > JM election takes place. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail
[ https://issues.apache.org/jira/browse/FLINK-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375179#comment-16375179 ] Elias Levy commented on FLINK-7641: --- I mean that if you have a standalone cluster in HA mode with multiple JMs, if the current master JM fails, any jobs executing in the cluster will be stopped and then restored by the new master JM. Ideally master JM failover should be largely invisible to running jobs. At most, they should be temporary paused and continued, rather than stopped and restarted. > Loss of JobManager in HA mode should not cause jobs to fail > --- > > Key: FLINK-7641 > URL: https://issues.apache.org/jira/browse/FLINK-7641 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.3.2 >Reporter: Elias Levy >Assignee: vinoyang >Priority: Major > > Currently if a standalone cluster of JobManagers is configured in > high-availability mode and the master JM is lost, the job executing in the > cluster will be restarted. This is less than ideal. It would be best if the > jobs could continue to execute without restarting while one of the spare JMs > becomes the new master, or in the worse case, the jobs are paused while the > JM election takes place. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8761) Various improvements to the Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-8761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374912#comment-16374912 ] ASF GitHub Bot commented on FLINK-8761: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5569 Thanks for checking this out! Concerning the removed `flink-clients` dependency - that was done on purpose. Here is my motivation for that: - For the 'provided' API dependency, it should actually not be there. - It mainly matters to the in-Eclipse execution, which needs a 'provided' runtime. - I would prefer to remove it now, because the dependencies probably change anyways in the future. The proposal I want to make for the next Flink version is to have something like `flink-all` (or `flink-base`) which exactly refers to what is in the `flink-dist` jar (and `flink-dist` draws its jar from there). We set that as a provided dependency anywhere (connector, library, quickstart), we have exactly everything provided that will available through Flink's runtime, and nothing that will not be there. Easiest way to keep these things in sync. Now I am unsure if `flink-dist` can currently take a `flink-all` like role, because it declares a lot of additional provided dependencies, for the sake of putting them into `opt` or `examples`. Now, according to the maven dependency management, a transitive provided dependency of a provided dependency is not propagated (in which case it would be okay to use flink-dist), but I am not sure we want to rely that across maven command line, IntelliJ and Eclipse, etc. The last sentence incidentally just made me realize that I should probably change the `flink-dist` dependency in the IDEA profile to `flink-streaming-java` / `flink-clients` for exactly that reason... > Various improvements to the Quickstarts > --- > > Key: FLINK-8761 > URL: https://issues.apache.org/jira/browse/FLINK-8761 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > Various improvements to the Quickstarts to give a smoother out of the box > experience. > Broken down into the subtasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5569: [FLINK-8761] [quickstarts] Big improvements to the quicks...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5569 Thanks for checking this out! Concerning the removed `flink-clients` dependency - that was done on purpose. Here is my motivation for that: - For the 'provided' API dependency, it should actually not be there. - It mainly matters to the in-Eclipse execution, which needs a 'provided' runtime. - I would prefer to remove it now, because the dependencies probably change anyways in the future. The proposal I want to make for the next Flink version is to have something like `flink-all` (or `flink-base`) which exactly refers to what is in the `flink-dist` jar (and `flink-dist` draws its jar from there). We set that as a provided dependency anywhere (connector, library, quickstart), we have exactly everything provided that will available through Flink's runtime, and nothing that will not be there. Easiest way to keep these things in sync. Now I am unsure if `flink-dist` can currently take a `flink-all` like role, because it declares a lot of additional provided dependencies, for the sake of putting them into `opt` or `examples`. Now, according to the maven dependency management, a transitive provided dependency of a provided dependency is not propagated (in which case it would be okay to use flink-dist), but I am not sure we want to rely that across maven command line, IntelliJ and Eclipse, etc. The last sentence incidentally just made me realize that I should probably change the `flink-dist` dependency in the IDEA profile to `flink-streaming-java` / `flink-clients` for exactly that reason... ---
[jira] [Commented] (FLINK-8761) Various improvements to the Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-8761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374907#comment-16374907 ] ASF GitHub Bot commented on FLINK-8761: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5569#discussion_r170351589 --- Diff: flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml --- @@ -50,181 +51,113 @@ under the License. - - - - org.apache.flink - flink-core - ${flink.version} - + org.apache.flink flink-java ${flink.version} + provided - org.apache.flink - flink-clients_${scala.binary.version} + flink-streaming-java_${scala.binary.version} ${flink.version} + provided + + + + - + + org.slf4j slf4j-log4j12 - ${slf4j.version} + 1.7.7 + runtime log4j log4j - ${log4j.version} + 1.2.17 + runtime - - - - build-jar - - - false - - - - - org.apache.flink - flink-core - ${flink.version} - provided - - - org.apache.flink - flink-java - ${flink.version} - provided - - - org.apache.flink - flink-clients_${scala.binary.version} - ${flink.version} - provided - - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - - - org.slf4j - slf4j-log4j12 - ${slf4j.version} - provided - - - log4j - log4j - ${log4j.version} - provided - - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.0.0 - - - - package - - shade - - - - - org.apache.flink:force-shading - com.google.code.findbugs:jsr305 -
[GitHub] flink pull request #5569: [FLINK-8761] [quickstarts] Big improvements to the...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5569#discussion_r170351589 --- Diff: flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml --- @@ -50,181 +51,113 @@ under the License. - - - - org.apache.flink - flink-core - ${flink.version} - + org.apache.flink flink-java ${flink.version} + provided - org.apache.flink - flink-clients_${scala.binary.version} + flink-streaming-java_${scala.binary.version} ${flink.version} + provided + + + + - + + org.slf4j slf4j-log4j12 - ${slf4j.version} + 1.7.7 + runtime log4j log4j - ${log4j.version} + 1.2.17 + runtime - - - - build-jar - - - false - - - - - org.apache.flink - flink-core - ${flink.version} - provided - - - org.apache.flink - flink-java - ${flink.version} - provided - - - org.apache.flink - flink-clients_${scala.binary.version} - ${flink.version} - provided - - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - - - org.slf4j - slf4j-log4j12 - ${slf4j.version} - provided - - - log4j - log4j - ${log4j.version} - provided - - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.0.0 - - - - package - - shade - - - - - org.apache.flink:force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - -
[GitHub] flink pull request #5569: [FLINK-8761] [quickstarts] Big improvements to the...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5569#discussion_r170351448 --- Diff: flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml --- @@ -50,181 +51,113 @@ under the License. - - - - org.apache.flink - flink-core - ${flink.version} - + org.apache.flink flink-java ${flink.version} + provided - org.apache.flink - flink-clients_${scala.binary.version} + flink-streaming-java_${scala.binary.version} ${flink.version} + provided + + + +
[jira] [Commented] (FLINK-8761) Various improvements to the Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-8761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374906#comment-16374906 ] ASF GitHub Bot commented on FLINK-8761: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5569#discussion_r170351448 --- Diff: flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml --- @@ -50,181 +51,113 @@ under the License. - - - - org.apache.flink - flink-core - ${flink.version} - + org.apache.flink flink-java ${flink.version} + provided - org.apache.flink - flink-clients_${scala.binary.version} + flink-streaming-java_${scala.binary.version} ${flink.version} + provided + + + + > Key: FLINK-8761 > URL: https://issues.apache.org/jira/browse/FLINK-8761 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > Various improvements to the Quickstarts to give a smoother out of the box > experience. > Broken down into the subtasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374695#comment-16374695 ] ASF GitHub Bot commented on FLINK-8746: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170314325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +/** + * Exceptions which indicate that a checkpoint triggering has failed. + * + */ +public class CheckpointTriggerException extends FlinkException { + + private static final long serialVersionUID = -3330160816161901752L; + + private final CheckpointDeclineReason checkpointDeclineReason; + + public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) { + super(message + " Decline reason: " + checkpointDeclineReason); --- End diff -- True, this is better. Will fix it. > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374696#comment-16374696 ] ASF GitHub Bot commented on FLINK-8746: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170314390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -534,42 +536,94 @@ public void postStop() throws Exception { // 4. take a savepoint final CompletableFuture savepointFuture = triggerSavepoint( - jobMasterConfiguration.getTmpDirectory(), - timeout); + null, + timeout) + .handleAsync( + (String savepointPath, Throwable throwable) -> { + if (throwable != null) { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + if (strippedThrowable instanceof CheckpointTriggerException) { + final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + + if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { + return lastInternalSavepoint; + } else { + throw new CompletionException(checkpointTriggerException); + } + } else { + throw new CompletionException(strippedThrowable); + } + } else { + final String savepointToDispose = lastInternalSavepoint; --- End diff -- You're totally right. Will add a guard. > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170314390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -534,42 +536,94 @@ public void postStop() throws Exception { // 4. take a savepoint final CompletableFuture savepointFuture = triggerSavepoint( - jobMasterConfiguration.getTmpDirectory(), - timeout); + null, + timeout) + .handleAsync( + (String savepointPath, Throwable throwable) -> { + if (throwable != null) { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + if (strippedThrowable instanceof CheckpointTriggerException) { + final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + + if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { + return lastInternalSavepoint; + } else { + throw new CompletionException(checkpointTriggerException); + } + } else { + throw new CompletionException(strippedThrowable); + } + } else { + final String savepointToDispose = lastInternalSavepoint; --- End diff -- You're totally right. Will add a guard. ---
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170314325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +/** + * Exceptions which indicate that a checkpoint triggering has failed. + * + */ +public class CheckpointTriggerException extends FlinkException { + + private static final long serialVersionUID = -3330160816161901752L; + + private final CheckpointDeclineReason checkpointDeclineReason; + + public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) { + super(message + " Decline reason: " + checkpointDeclineReason); --- End diff -- True, this is better. Will fix it. ---
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374693#comment-16374693 ] ASF GitHub Bot commented on FLINK-8746: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170314219 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -289,7 +290,7 @@ public void invoke() throws Exception { // wait forever (until canceled) synchronized (this) { - while (error == null && lastCheckpointId < numCalls) { --- End diff -- The only locations where a notify was called is when the error is set to `!= null`. Furthermore I think, the testing task is not intended to complete in case that `lastCheckpointId >= numCalls`. Alternatively, one could fix the problem by executing the `lastCheckpointId` under the lock as well as the trigger of `awaitLatch`. > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170314219 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -289,7 +290,7 @@ public void invoke() throws Exception { // wait forever (until canceled) synchronized (this) { - while (error == null && lastCheckpointId < numCalls) { --- End diff -- The only locations where a notify was called is when the error is set to `!= null`. Furthermore I think, the testing task is not intended to complete in case that `lastCheckpointId >= numCalls`. Alternatively, one could fix the problem by executing the `lastCheckpointId` under the lock as well as the trigger of `awaitLatch`. ---
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374692#comment-16374692 ] ASF GitHub Bot commented on FLINK-8746: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170313695 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -1156,6 +1213,26 @@ private void disposeSavepoint(String savepointPath) { } } + /** +* Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}. +* +* @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored +* @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from +* @throws Exception if the {@link ExecutionGraph} could not be restored +*/ + private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception { + if (savepointRestoreSettings.restoreSavepoint()) { + final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { --- End diff -- The checks are not done at all call sites. Only in the constructor you have this check. > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170313695 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -1156,6 +1213,26 @@ private void disposeSavepoint(String savepointPath) { } } + /** +* Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}. +* +* @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored +* @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from +* @throws Exception if the {@link ExecutionGraph} could not be restored +*/ + private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception { + if (savepointRestoreSettings.restoreSavepoint()) { + final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { --- End diff -- The checks are not done at all call sites. Only in the constructor you have this check. ---
[jira] [Created] (FLINK-8771) Upgrade scalastyle to 1.0.0
Ted Yu created FLINK-8771: - Summary: Upgrade scalastyle to 1.0.0 Key: FLINK-8771 URL: https://issues.apache.org/jira/browse/FLINK-8771 Project: Flink Issue Type: Improvement Components: Build System Reporter: Ted Yu scalastyle 1.0.0 fixes issue with import order, explicit type for public methods, line length limitation and comment validation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374620#comment-16374620 ] ASF GitHub Bot commented on FLINK-8538: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170304145 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** -* Returns a valid descriptor. +* Returns a set of valid descriptors. +* This method is implemented in both Scala and Java. +*/ + def descriptors(): java.util.List[Descriptor] + + /** +* Returns a set of properties for each valid descriptor. +* This code is implemented in both Scala and Java. */ - def descriptor(): Descriptor + def properties(): java.util.List[java.util.Map[String, String]] /** -* Returns a validator that can validate this descriptor. +* Returns a validator that can validate all valid descriptors. */ def validator(): DescriptorValidator - def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + @Test + def testValidation(): Unit = { +val d = descriptors().asScala +val p = properties().asScala + +Preconditions.checkArgument(d.length == p.length) + +d.zip(p).foreach { case (desc, props) => + verifyProperties(desc, props.asScala.toMap) +} + } + + def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = { val normProps = new DescriptorProperties descriptor.addProperties(normProps) -assertEquals(expected.toMap, normProps.asMap) +assertEquals(expected, normProps.asScalaMap) } - def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + def verifyInvalidProperty( + descriptor: Descriptor, + property: String, + invalidValue: String): Unit = { val properties = new DescriptorProperties -descriptor().addProperties(properties) +descriptor.addProperties(properties) properties.unsafePut(property, invalidValue) validator().validate(properties) } - def verifyMissingProperty(removeProperty: String): Unit = { + def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = { val properties = new DescriptorProperties -descriptor().addProperties(properties) +descriptor.addProperties(properties) properties.unsafeRemove(removeProperty) validator().validate(properties) } } + +class TestTableSourceDescriptor(connector: ConnectorDescriptor) + extends TableSourceDescriptor(connector) { --- End diff -- Sorry about that. I forgot to rebuild after rebasing. > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170304145 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** -* Returns a valid descriptor. +* Returns a set of valid descriptors. +* This method is implemented in both Scala and Java. +*/ + def descriptors(): java.util.List[Descriptor] + + /** +* Returns a set of properties for each valid descriptor. +* This code is implemented in both Scala and Java. */ - def descriptor(): Descriptor + def properties(): java.util.List[java.util.Map[String, String]] /** -* Returns a validator that can validate this descriptor. +* Returns a validator that can validate all valid descriptors. */ def validator(): DescriptorValidator - def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + @Test + def testValidation(): Unit = { +val d = descriptors().asScala +val p = properties().asScala + +Preconditions.checkArgument(d.length == p.length) + +d.zip(p).foreach { case (desc, props) => + verifyProperties(desc, props.asScala.toMap) +} + } + + def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = { val normProps = new DescriptorProperties descriptor.addProperties(normProps) -assertEquals(expected.toMap, normProps.asMap) +assertEquals(expected, normProps.asScalaMap) } - def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + def verifyInvalidProperty( + descriptor: Descriptor, + property: String, + invalidValue: String): Unit = { val properties = new DescriptorProperties -descriptor().addProperties(properties) +descriptor.addProperties(properties) properties.unsafePut(property, invalidValue) validator().validate(properties) } - def verifyMissingProperty(removeProperty: String): Unit = { + def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = { val properties = new DescriptorProperties -descriptor().addProperties(properties) +descriptor.addProperties(properties) properties.unsafeRemove(removeProperty) validator().validate(properties) } } + +class TestTableSourceDescriptor(connector: ConnectorDescriptor) + extends TableSourceDescriptor(connector) { --- End diff -- Sorry about that. I forgot to rebuild after rebasing. ---
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374600#comment-16374600 ] ASF GitHub Bot commented on FLINK-8746: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170300254 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -534,42 +536,94 @@ public void postStop() throws Exception { // 4. take a savepoint final CompletableFuture savepointFuture = triggerSavepoint( - jobMasterConfiguration.getTmpDirectory(), - timeout); + null, + timeout) + .handleAsync( + (String savepointPath, Throwable throwable) -> { + if (throwable != null) { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + if (strippedThrowable instanceof CheckpointTriggerException) { + final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + + if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { + return lastInternalSavepoint; + } else { + throw new CompletionException(checkpointTriggerException); + } + } else { + throw new CompletionException(strippedThrowable); + } + } else { + final String savepointToDispose = lastInternalSavepoint; --- End diff -- I think `savepointToDispose` be `null`. > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374603#comment-16374603 ] ASF GitHub Bot commented on FLINK-8746: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170279814 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -1156,6 +1213,26 @@ private void disposeSavepoint(String savepointPath) { } } + /** +* Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}. +* +* @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored +* @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from +* @throws Exception if the {@link ExecutionGraph} could not be restored +*/ + private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception { + if (savepointRestoreSettings.restoreSavepoint()) { + final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { --- End diff -- I think at this point the `checkpointCoordinator` should not be `null` (there already are checks before). Maybe replace it with a `checkState`. > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374602#comment-16374602 ] ASF GitHub Bot commented on FLINK-8746: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170281192 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -289,7 +290,7 @@ public void invoke() throws Exception { // wait forever (until canceled) synchronized (this) { - while (error == null && lastCheckpointId < numCalls) { --- End diff -- Are you sure this needed to be removed? > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374601#comment-16374601 ] ASF GitHub Bot commented on FLINK-8746: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170296322 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +/** + * Exceptions which indicate that a checkpoint triggering has failed. + * + */ +public class CheckpointTriggerException extends FlinkException { + + private static final long serialVersionUID = -3330160816161901752L; + + private final CheckpointDeclineReason checkpointDeclineReason; + + public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) { + super(message + " Decline reason: " + checkpointDeclineReason); --- End diff -- Not super important because it is never logged but you could include the enums `.message()` into the exception message. > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running
[ https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374604#comment-16374604 ] ASF GitHub Bot commented on FLINK-8746: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170296168 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +/** + * Exceptions which indicate that a checkpoint triggering has failed. + * + */ +public class CheckpointTriggerException extends FlinkException { + + private static final long serialVersionUID = -3330160816161901752L; + + private final CheckpointDeclineReason checkpointDeclineReason; + + public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) { + super(message + " Decline reason: " + checkpointDeclineReason); --- End diff -- Not super important because it is never logged but you could include the enums `.message()` into the exception message. > Support rescaling of jobs which are not fully running > - > > Key: FLINK-8746 > URL: https://issues.apache.org/jira/browse/FLINK-8746 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should support the rescaling of jobs which are only partially running. > Currently, this fails because rescaling requires to take a savepoint. We can > solve the problem by falling back to the latest rescaling savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170279814 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -1156,6 +1213,26 @@ private void disposeSavepoint(String savepointPath) { } } + /** +* Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}. +* +* @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored +* @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from +* @throws Exception if the {@link ExecutionGraph} could not be restored +*/ + private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception { + if (savepointRestoreSettings.restoreSavepoint()) { + final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { --- End diff -- I think at this point the `checkpointCoordinator` should not be `null` (there already are checks before). Maybe replace it with a `checkState`. ---
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170296322 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +/** + * Exceptions which indicate that a checkpoint triggering has failed. + * + */ +public class CheckpointTriggerException extends FlinkException { + + private static final long serialVersionUID = -3330160816161901752L; + + private final CheckpointDeclineReason checkpointDeclineReason; + + public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) { + super(message + " Decline reason: " + checkpointDeclineReason); --- End diff -- Not super important because it is never logged but you could include the enums `.message()` into the exception message. ---
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170296168 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +/** + * Exceptions which indicate that a checkpoint triggering has failed. + * + */ +public class CheckpointTriggerException extends FlinkException { + + private static final long serialVersionUID = -3330160816161901752L; + + private final CheckpointDeclineReason checkpointDeclineReason; + + public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) { + super(message + " Decline reason: " + checkpointDeclineReason); --- End diff -- Not super important because it is never logged but you could include the enums `.message()` into the exception message. ---
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170300254 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -534,42 +536,94 @@ public void postStop() throws Exception { // 4. take a savepoint final CompletableFuture savepointFuture = triggerSavepoint( - jobMasterConfiguration.getTmpDirectory(), - timeout); + null, + timeout) + .handleAsync( + (String savepointPath, Throwable throwable) -> { + if (throwable != null) { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + if (strippedThrowable instanceof CheckpointTriggerException) { + final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + + if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { + return lastInternalSavepoint; + } else { + throw new CompletionException(checkpointTriggerException); + } + } else { + throw new CompletionException(strippedThrowable); + } + } else { + final String savepointToDispose = lastInternalSavepoint; --- End diff -- I think `savepointToDispose` be `null`. ---
[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170281192 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -289,7 +290,7 @@ public void invoke() throws Exception { // wait forever (until canceled) synchronized (this) { - while (error == null && lastCheckpointId < numCalls) { --- End diff -- Are you sure this needed to be removed? ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374593#comment-16374593 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170297941 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** -* Returns a valid descriptor. +* Returns a set of valid descriptors. +* This method is implemented in both Scala and Java. +*/ + def descriptors(): java.util.List[Descriptor] + + /** +* Returns a set of properties for each valid descriptor. +* This code is implemented in both Scala and Java. */ - def descriptor(): Descriptor + def properties(): java.util.List[java.util.Map[String, String]] /** -* Returns a validator that can validate this descriptor. +* Returns a validator that can validate all valid descriptors. */ def validator(): DescriptorValidator - def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + @Test + def testValidation(): Unit = { +val d = descriptors().asScala +val p = properties().asScala + +Preconditions.checkArgument(d.length == p.length) + +d.zip(p).foreach { case (desc, props) => + verifyProperties(desc, props.asScala.toMap) +} + } + + def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = { val normProps = new DescriptorProperties descriptor.addProperties(normProps) -assertEquals(expected.toMap, normProps.asMap) +assertEquals(expected, normProps.asScalaMap) } - def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + def verifyInvalidProperty( + descriptor: Descriptor, + property: String, + invalidValue: String): Unit = { val properties = new DescriptorProperties -descriptor().addProperties(properties) +descriptor.addProperties(properties) properties.unsafePut(property, invalidValue) validator().validate(properties) } - def verifyMissingProperty(removeProperty: String): Unit = { + def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = { --- End diff -- rename to `removePropertyAndVerify()` > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374575#comment-16374575 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170229924 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -89,37 +105,58 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { put(key, clazz.getName) } + /** +* Adds a string under the given key. +*/ def putString(key: String, str: String): Unit = { checkNotNull(key) checkNotNull(str) put(key, str) } + /** +* Adds a boolean under the given key. +*/ def putBoolean(key: String, b: Boolean): Unit = { checkNotNull(key) put(key, b.toString) } + /** +* Adds a long under the given key. +*/ def putLong(key: String, l: Long): Unit = { checkNotNull(key) put(key, l.toString) } + /** +* Adds an integer under the given key. +*/ def putInt(key: String, i: Int): Unit = { checkNotNull(key) put(key, i.toString) } + /** +* Adds a character under the given key. +*/ def putCharacter(key: String, c: Character): Unit = { checkNotNull(key) checkNotNull(c) put(key, c.toString) } + /** +* Adds a table schema under the given key. +*/ def putTableSchema(key: String, schema: TableSchema): Unit = { putTableSchema(key, normalizeTableSchema(schema)) } + /** +* Adds a table schema under the given key. +*/ def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = { --- End diff -- Remove? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374581#comment-16374581 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170230962 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** +* Adds an indexed mapping of properties under a common key. +* +* For example: +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.name = test2 +* +* The arity of the propertySets can differ. +* +* This method is intended for Java code. +*/ + def putIndexedVariableProperties( + key: String, + propertySets: JList[JMap[String, String]]) +: Unit = { +checkNotNull(key) +checkNotNull(propertySets) +putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap)) + } + // -- + /** +* Returns a string value under the given key if it exists. +*/ def getString(key: String): Option[String] = { properties.get(key) } - def getCharacter(key: String): Option[Character] = getString(key) match { -case Some(c) => - if (c.length != 1) { -throw new ValidationException(s"The value of $key must only contain one character.") - } - Some(c.charAt(0)) + /** +* Returns a string value under the given key if it exists. +* This method is intended for Java code. +*/ + def getOptionalString(key: String): Optional[String] = toJava(getString(key)) -case None => None + /** +* Returns a character value under the given key if it exists. +*/ + def getCharacter(key: String): Option[Character] = getString(key).map { c => +if (c.length != 1) { + throw new ValidationException(s"The value of $key must only contain one character.") +} +c.charAt(0) } - def getBoolean(key: String): Option[Boolean] = getString(key) match { -case Some(b) => Some(JBoolean.parseBoolean(b)) - -case None => None + /** +* Returns a class value under the given key if it exists. +*/ + def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = { +properties.get(key).map { name => + val clazz = try { +Class.forName( + name, + true, + Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]] + } catch { +case e: Exception => + throw new ValidationException(s"Coult not get class for key '$key'.", e) --- End diff -- Add name of class? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170291034 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala --- @@ -18,48 +18,67 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.ValidationException import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner import org.apache.flink.types.Row import org.junit.Test -class RowtimeTest extends DescriptorTestBase { +import scala.collection.JavaConverters._ - @Test - def testRowtime(): Unit = { -val desc = Rowtime() - .timestampsFromField("otherField") - .watermarksPeriodicBounding(1000L) -val expected = Seq( - "rowtime.0.version" -> "1", - "rowtime.0.timestamps.type" -> "from-field", - "rowtime.0.timestamps.from" -> "otherField", - "rowtime.0.watermarks.type" -> "periodic-bounding", - "rowtime.0.watermarks.delay" -> "1000" -) -verifyProperties(desc, expected) - } +class RowtimeTest extends DescriptorTestBase { @Test(expected = classOf[ValidationException]) def testInvalidWatermarkType(): Unit = { -verifyInvalidProperty("rowtime.0.watermarks.type", "xxx") +verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", "xxx") } @Test(expected = classOf[ValidationException]) def testMissingWatermarkClass(): Unit = { -verifyMissingProperty("rowtime.0.watermarks.class") +verifyMissingProperty(descriptors().get(1), "rowtime.watermarks.class") --- End diff -- use constant ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374594#comment-16374594 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170296912 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.table.api.ValidationException; + +/** + * Validator for {@link Json}. + */ +public class JsonValidator extends FormatDescriptorValidator { + + public static final String FORMAT_TYPE_VALUE = "json"; + public static final String FORMAT_SCHEMA = "format.schema"; + public static final String FORMAT_JSON_SCHEMA = "format.json-schema"; + public static final String FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field"; + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA); + final boolean hasSchemaString = properties.containsKey(FORMAT_JSON_SCHEMA); + if (hasSchema && hasSchemaString) { + throw new ValidationException("A definition of both a schema and JSON schema is not allowed."); + } else if (!hasSchema && !hasSchemaString) { + throw new ValidationException("A definition of a schema and JSON schema is required."); --- End diff -- replace "and" by "or" -> "A definition of a schema or JSON schema is required." > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170230962 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** +* Adds an indexed mapping of properties under a common key. +* +* For example: +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.name = test2 +* +* The arity of the propertySets can differ. +* +* This method is intended for Java code. +*/ + def putIndexedVariableProperties( + key: String, + propertySets: JList[JMap[String, String]]) +: Unit = { +checkNotNull(key) +checkNotNull(propertySets) +putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap)) + } + // -- + /** +* Returns a string value under the given key if it exists. +*/ def getString(key: String): Option[String] = { properties.get(key) } - def getCharacter(key: String): Option[Character] = getString(key) match { -case Some(c) => - if (c.length != 1) { -throw new ValidationException(s"The value of $key must only contain one character.") - } - Some(c.charAt(0)) + /** +* Returns a string value under the given key if it exists. +* This method is intended for Java code. +*/ + def getOptionalString(key: String): Optional[String] = toJava(getString(key)) -case None => None + /** +* Returns a character value under the given key if it exists. +*/ + def getCharacter(key: String): Option[Character] = getString(key).map { c => +if (c.length != 1) { + throw new ValidationException(s"The value of $key must only contain one character.") +} +c.charAt(0) } - def getBoolean(key: String): Option[Boolean] = getString(key) match { -case Some(b) => Some(JBoolean.parseBoolean(b)) - -case None => None + /** +* Returns a class value under the given key if it exists. +*/ + def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = { +properties.get(key).map { name => + val clazz = try { +Class.forName( + name, + true, + Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]] + } catch { +case e: Exception => + throw new ValidationException(s"Coult not get class for key '$key'.", e) --- End diff -- Add name of class? ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374589#comment-16374589 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170290993 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala --- @@ -18,48 +18,67 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.ValidationException import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner import org.apache.flink.types.Row import org.junit.Test -class RowtimeTest extends DescriptorTestBase { +import scala.collection.JavaConverters._ - @Test - def testRowtime(): Unit = { -val desc = Rowtime() - .timestampsFromField("otherField") - .watermarksPeriodicBounding(1000L) -val expected = Seq( - "rowtime.0.version" -> "1", - "rowtime.0.timestamps.type" -> "from-field", - "rowtime.0.timestamps.from" -> "otherField", - "rowtime.0.watermarks.type" -> "periodic-bounding", - "rowtime.0.watermarks.delay" -> "1000" -) -verifyProperties(desc, expected) - } +class RowtimeTest extends DescriptorTestBase { @Test(expected = classOf[ValidationException]) def testInvalidWatermarkType(): Unit = { -verifyInvalidProperty("rowtime.0.watermarks.type", "xxx") +verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", "xxx") --- End diff -- use constant instead of `"rowtime.watermarks.type"` > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170224845 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java --- @@ -83,10 +84,32 @@ protected JsonRowDeserializationSchema getDeserializationSchema() { @Override public String explainSource() { - return "KafkaJSONTableSource"; + return "KafkaJsonTableSource"; } - SETTERS FOR OPTIONAL PARAMETERS + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaJsonTableSource)) { + return false; + } + if (!super.equals(o)) { + return false; + } + KafkaJsonTableSource that = (KafkaJsonTableSource) o; + return failOnMissingField == that.failOnMissingField && + Objects.equals(jsonSchema, that.jsonSchema) && + Objects.equals(fieldMapping, that.fieldMapping); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, failOnMissingField); --- End diff -- `TableSchema` does not override `hashCode()` ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374586#comment-16374586 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170271213 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -32,11 +32,36 @@ import scala.collection.mutable */ class Schema extends Descriptor { + private var deriveFields: Option[String] = None + // maps a field name to a list of properties that describe type, origin, and the time attribute private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]() private var lastField: Option[String] = None + /** +* Derives field names and types from a preceding connector or format. Additional fields that +* are defined in this schema extend the derived fields. The derived fields are +* added in an alphabetical order according to their field name. +*/ + def deriveFieldsAlphabetically(): Schema = { --- End diff -- I think we should support inferring the format from the schema rather than the schema from the format. This would be more aligned with how it would work in a `CREATE TABLE` statement and how Hive is doing it for example. We should still support to define the format explicitly though. > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170230041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** +* Adds an indexed mapping of properties under a common key. +* +* For example: +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.name = test2 +* +* The arity of the propertySets can differ. +* +* This method is intended for Java code. +*/ + def putIndexedVariableProperties( --- End diff -- Remove Scala equivalent? ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170257053 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" + val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class" + val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized" + val ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay" --- End diff -- `rowtime.watermarks.bounded.delay`? ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170257084 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" --- End diff -- BOUNDING -> BOUNDED ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170273135 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala --- @@ -41,10 +41,10 @@ trait TableSourceFactory[T] { * - connector.type * - format.type * -* Specified versions allow the framework to provide backwards compatible properties in case of --- End diff -- (not related to this change) Should we add something like a `priority` property to `TableSourceFactory` that determines in which order factories are matched. If two factories match, we would use the factory with the higher priority. ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170290993 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala --- @@ -18,48 +18,67 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.ValidationException import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner import org.apache.flink.types.Row import org.junit.Test -class RowtimeTest extends DescriptorTestBase { +import scala.collection.JavaConverters._ - @Test - def testRowtime(): Unit = { -val desc = Rowtime() - .timestampsFromField("otherField") - .watermarksPeriodicBounding(1000L) -val expected = Seq( - "rowtime.0.version" -> "1", - "rowtime.0.timestamps.type" -> "from-field", - "rowtime.0.timestamps.from" -> "otherField", - "rowtime.0.watermarks.type" -> "periodic-bounding", - "rowtime.0.watermarks.delay" -> "1000" -) -verifyProperties(desc, expected) - } +class RowtimeTest extends DescriptorTestBase { @Test(expected = classOf[ValidationException]) def testInvalidWatermarkType(): Unit = { -verifyInvalidProperty("rowtime.0.watermarks.type", "xxx") +verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", "xxx") --- End diff -- use constant instead of `"rowtime.watermarks.type"` ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374579#comment-16374579 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256878 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" + val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class" + val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized" --- End diff -- `rowtime.watermarks.custom.serialized`? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374578#comment-16374578 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256073 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" --- End diff -- `rowtime.timestamps.custom.class` > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256851 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" + val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class" --- End diff -- `rowtime.watermarks.custom.class`? ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170229569 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -128,6 +165,13 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { ) } + /** +* Adds a table schema under the given key. This method is intended for Java code. +*/ + def putTableSchema(key: String, nameAndType: JList[JTuple2[String, String]]): Unit = { --- End diff -- I think we should drop the Scala equivalent method. This is not a public API class that needs a shiny Scala interface but should be usable from Java and Scala. ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170230912 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** +* Adds an indexed mapping of properties under a common key. +* +* For example: +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.name = test2 +* +* The arity of the propertySets can differ. +* +* This method is intended for Java code. +*/ + def putIndexedVariableProperties( + key: String, + propertySets: JList[JMap[String, String]]) +: Unit = { +checkNotNull(key) +checkNotNull(propertySets) +putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap)) + } + // -- + /** +* Returns a string value under the given key if it exists. +*/ def getString(key: String): Option[String] = { properties.get(key) } - def getCharacter(key: String): Option[Character] = getString(key) match { -case Some(c) => - if (c.length != 1) { -throw new ValidationException(s"The value of $key must only contain one character.") - } - Some(c.charAt(0)) + /** +* Returns a string value under the given key if it exists. +* This method is intended for Java code. +*/ + def getOptionalString(key: String): Optional[String] = toJava(getString(key)) -case None => None + /** +* Returns a character value under the given key if it exists. +*/ + def getCharacter(key: String): Option[Character] = getString(key).map { c => +if (c.length != 1) { + throw new ValidationException(s"The value of $key must only contain one character.") +} +c.charAt(0) } - def getBoolean(key: String): Option[Boolean] = getString(key) match { -case Some(b) => Some(JBoolean.parseBoolean(b)) - -case None => None + /** +* Returns a class value under the given key if it exists. +*/ + def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = { +properties.get(key).map { name => + val clazz = try { +Class.forName( + name, + true, + Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]] + } catch { +case e: Exception => + throw new ValidationException(s"Coult not get class for key '$key'.", e) --- End diff -- typo: Could ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374574#comment-16374574 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170230041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** +* Adds an indexed mapping of properties under a common key. +* +* For example: +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.name = test2 +* +* The arity of the propertySets can differ. +* +* This method is intended for Java code. +*/ + def putIndexedVariableProperties( --- End diff -- Remove Scala equivalent? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170271213 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala --- @@ -32,11 +32,36 @@ import scala.collection.mutable */ class Schema extends Descriptor { + private var deriveFields: Option[String] = None + // maps a field name to a list of properties that describe type, origin, and the time attribute private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]() private var lastField: Option[String] = None + /** +* Derives field names and types from a preceding connector or format. Additional fields that +* are defined in this schema extend the derived fields. The derived fields are +* added in an alphabetical order according to their field name. +*/ + def deriveFieldsAlphabetically(): Schema = { --- End diff -- I think we should support inferring the format from the schema rather than the schema from the format. This would be more aligned with how it would work in a `CREATE TABLE` statement and how Hive is doing it for example. We should still support to define the format explicitly though. ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374577#comment-16374577 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170257084 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" --- End diff -- BOUNDING -> BOUNDED > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374587#comment-16374587 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170272030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal object SchemaValidator { val SCHEMA = "schema" - val SCHEMA_VERSION = "schema.version" + val SCHEMA_PROPERTY_VERSION = "schema.property-version" + val SCHEMA_FIELDS = "schema.fields" + val SCHEMA_FIELDS_NAME = "name" + val SCHEMA_FIELDS_TYPE = "type" + val SCHEMA_FIELDS_PROCTIME = "proctime" + val SCHEMA_FIELDS_FROM = "from" + val SCHEMA_DERIVE_FIELDS = "schema.derive-fields" + val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically" + val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially" + + // utilities + + /** +* Derives a schema from properties and source. +*/ + def deriveSchema( + properties: DescriptorProperties, + sourceSchema: Option[TableSchema]) +: TableSchema = { + +val builder = TableSchema.builder() + +val schema = properties.getTableSchema(SCHEMA_FIELDS) + +val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS) + +val sourceNamesAndTypes = derivationMode match { + case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if sourceSchema.isDefined => +// sort by name +sourceSchema.get.getColumnNames + .zip(sourceSchema.get.getTypes) + .sortBy(_._1) + + case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if sourceSchema.isDefined => +sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes) + + case Some(_) => +throw new ValidationException("Derivation of fields is not supported from this source.") + + case None => +Array[(String, TypeInformation[_])]() +} + +// add source fields +sourceNamesAndTypes.foreach { case (n, t) => + builder.field(n, t) +} + +// add schema fields +schema.foreach { ts => + val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes) + schemaNamesAndTypes.foreach { case (n, t) => + // do not allow overwriting + if (sourceNamesAndTypes.exists(_._1 == n)) { +throw new ValidationException( + "Specified schema fields must not overwrite fields derived from the source.") + } + builder.field(n, t) + } +} + +builder.build() + } + + /** +* Derives a schema from properties and source. +* This method is intended for Java code. +*/ + def deriveSchema( + properties: DescriptorProperties, + sourceSchema: Optional[TableSchema]) +: TableSchema = { +deriveSchema( + properties, + Option(sourceSchema.orElse(null))) + } + + /** +* Finds the proctime attribute if defined. +*/ + def deriveProctimeAttribute(properties: DescriptorProperties): Option[String] = { +val names = properties.getIndexedProperty(SCHEMA_FIELDS, SCHEMA_FIELDS_NAME) + +for (i <- 0 until names.size) { + val isProctime = properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME") + isProctime.foreach { isSet => +if (isSet) { + return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME") +} + } +} +None + } + + /** +* Finds the proctime attribute if defined. +* This method is intended for Java code. +*/ + def deriveProctimeOptional(properties: DescriptorProperties): Optional[String] = { +Optional.ofNullable(deriveProctimeAttribute(properties).orNull) + } + + /** +* Finds the rowtime attributes if defined. +*/ + def deriveRowtimeAttributes(properties: DescriptorProperties) +: util.List[RowtimeAttributeDescriptor] = { + +val names = properties.getIndexedProperty(SCHEMA_FIELDS, SCHEMA_FIELDS_NAME) + +var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]() + +// check for rowtime in every field +for (i <- 0 until names.size) { + RowtimeValidator +.getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.") +.foreach { case (extractor, strategy) => + // create descriptor +
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170296912 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.table.api.ValidationException; + +/** + * Validator for {@link Json}. + */ +public class JsonValidator extends FormatDescriptorValidator { + + public static final String FORMAT_TYPE_VALUE = "json"; + public static final String FORMAT_SCHEMA = "format.schema"; + public static final String FORMAT_JSON_SCHEMA = "format.json-schema"; + public static final String FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field"; + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA); + final boolean hasSchemaString = properties.containsKey(FORMAT_JSON_SCHEMA); + if (hasSchema && hasSchemaString) { + throw new ValidationException("A definition of both a schema and JSON schema is not allowed."); + } else if (!hasSchema && !hasSchemaString) { + throw new ValidationException("A definition of a schema and JSON schema is required."); --- End diff -- replace "and" by "or" -> "A definition of a schema or JSON schema is required." ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374595#comment-16374595 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170298004 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** -* Returns a valid descriptor. +* Returns a set of valid descriptors. +* This method is implemented in both Scala and Java. +*/ + def descriptors(): java.util.List[Descriptor] + + /** +* Returns a set of properties for each valid descriptor. +* This code is implemented in both Scala and Java. */ - def descriptor(): Descriptor + def properties(): java.util.List[java.util.Map[String, String]] /** -* Returns a validator that can validate this descriptor. +* Returns a validator that can validate all valid descriptors. */ def validator(): DescriptorValidator - def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + @Test + def testValidation(): Unit = { +val d = descriptors().asScala +val p = properties().asScala + +Preconditions.checkArgument(d.length == p.length) + +d.zip(p).foreach { case (desc, props) => + verifyProperties(desc, props.asScala.toMap) +} + } + + def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = { val normProps = new DescriptorProperties descriptor.addProperties(normProps) -assertEquals(expected.toMap, normProps.asMap) +assertEquals(expected, normProps.asScalaMap) } - def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + def verifyInvalidProperty( --- End diff -- rename to `addPropertyAndVerify()` > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374584#comment-16374584 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170257053 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" + val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class" + val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized" + val ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay" --- End diff -- `rowtime.watermarks.bounded.delay`? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374580#comment-16374580 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256851 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" + val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class" --- End diff -- `rowtime.watermarks.custom.class`? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374592#comment-16374592 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170291034 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala --- @@ -18,48 +18,67 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.ValidationException import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner import org.apache.flink.types.Row import org.junit.Test -class RowtimeTest extends DescriptorTestBase { +import scala.collection.JavaConverters._ - @Test - def testRowtime(): Unit = { -val desc = Rowtime() - .timestampsFromField("otherField") - .watermarksPeriodicBounding(1000L) -val expected = Seq( - "rowtime.0.version" -> "1", - "rowtime.0.timestamps.type" -> "from-field", - "rowtime.0.timestamps.from" -> "otherField", - "rowtime.0.watermarks.type" -> "periodic-bounding", - "rowtime.0.watermarks.delay" -> "1000" -) -verifyProperties(desc, expected) - } +class RowtimeTest extends DescriptorTestBase { @Test(expected = classOf[ValidationException]) def testInvalidWatermarkType(): Unit = { -verifyInvalidProperty("rowtime.0.watermarks.type", "xxx") +verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", "xxx") } @Test(expected = classOf[ValidationException]) def testMissingWatermarkClass(): Unit = { -verifyMissingProperty("rowtime.0.watermarks.class") +verifyMissingProperty(descriptors().get(1), "rowtime.watermarks.class") --- End diff -- use constant > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374591#comment-16374591 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170273904 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala --- @@ -47,53 +79,18 @@ class CsvTest extends DescriptorTestBase { "format.fields.3.name" -> "field4", "format.fields.3.type" -> "ROW(test INT, row VARCHAR)", --- End diff -- Shouldn't this fail because CSV does not support nested data? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374576#comment-16374576 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256006 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" --- End diff -- `rowtime.timestamps.from.field` > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374582#comment-16374582 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170229569 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -128,6 +165,13 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { ) } + /** +* Adds a table schema under the given key. This method is intended for Java code. +*/ + def putTableSchema(key: String, nameAndType: JList[JTuple2[String, String]]): Unit = { --- End diff -- I think we should drop the Scala equivalent method. This is not a public API class that needs a shiny Scala interface but should be usable from Java and Scala. > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374585#comment-16374585 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170232160 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -246,13 +394,93 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { Some(schemaBuilder.build()) } + /** +* Returns a table schema under the given key if it exists. +*/ + def getOptionalTableSchema(key: String): Optional[TableSchema] = toJava(getTableSchema(key)) + + /** +* Returns the type information under the given key if it exists. +*/ + def getType(key: String): Option[TypeInformation[_]] = { +properties.get(key).map(TypeStringUtils.readTypeInfo) + } + + /** +* Returns the type information under the given key if it exists. +* This method is intended for Java code. +*/ + def getOptionalType(key: String): Optional[TypeInformation[_]] = { +toJava(getType(key)) + } + + /** +* Returns a prefix subset of properties. +*/ + def getPrefix(prefixKey: String): Map[String, String] = { +val prefix = prefixKey + '.' +properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) => + k.substring(prefix.length) -> v // remove prefix +}.toMap + } + + /** +* Returns a prefix subset of properties. +* This method is intended for Java code. +*/ + def getPrefixMap(prefixKey: String): JMap[String, String] = getPrefix(prefixKey).asJava --- End diff -- I find the different names for methods that do the same confusing. I'd just remove the Scala methods. > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374590#comment-16374590 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170273135 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala --- @@ -41,10 +41,10 @@ trait TableSourceFactory[T] { * - connector.type * - format.type * -* Specified versions allow the framework to provide backwards compatible properties in case of --- End diff -- (not related to this change) Should we add something like a `priority` property to `TableSourceFactory` that determines in which order factories are matched. If two factories match, we would use the factory with the higher priority. > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374583#comment-16374583 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256141 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" --- End diff -- `rowtime.timestamps.custom.serialized` > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374588#comment-16374588 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170274382 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** -* Returns a valid descriptor. +* Returns a set of valid descriptors. +* This method is implemented in both Scala and Java. --- End diff -- Why is this important? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374572#comment-16374572 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170230028 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -155,6 +199,28 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** +* Adds an indexed sequence of properties (with sub-properties) under a common key. +* +* For example: +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.type = LONG, schema.fields.1.name = test2 +* +* The arity of each propertyValue must match the arity of propertyKeys. +* +* This method is intended for Java code. +*/ + def putIndexedFixedProperties( --- End diff -- Remove Scala equivalent? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374573#comment-16374573 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170230912 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** +* Adds an indexed mapping of properties under a common key. +* +* For example: +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.name = test2 +* +* The arity of the propertySets can differ. +* +* This method is intended for Java code. +*/ + def putIndexedVariableProperties( + key: String, + propertySets: JList[JMap[String, String]]) +: Unit = { +checkNotNull(key) +checkNotNull(propertySets) +putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap)) + } + // -- + /** +* Returns a string value under the given key if it exists. +*/ def getString(key: String): Option[String] = { properties.get(key) } - def getCharacter(key: String): Option[Character] = getString(key) match { -case Some(c) => - if (c.length != 1) { -throw new ValidationException(s"The value of $key must only contain one character.") - } - Some(c.charAt(0)) + /** +* Returns a string value under the given key if it exists. +* This method is intended for Java code. +*/ + def getOptionalString(key: String): Optional[String] = toJava(getString(key)) -case None => None + /** +* Returns a character value under the given key if it exists. +*/ + def getCharacter(key: String): Option[Character] = getString(key).map { c => +if (c.length != 1) { + throw new ValidationException(s"The value of $key must only contain one character.") +} +c.charAt(0) } - def getBoolean(key: String): Option[Boolean] = getString(key) match { -case Some(b) => Some(JBoolean.parseBoolean(b)) - -case None => None + /** +* Returns a class value under the given key if it exists. +*/ + def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = { +properties.get(key).map { name => + val clazz = try { +Class.forName( + name, + true, + Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]] + } catch { +case e: Exception => + throw new ValidationException(s"Coult not get class for key '$key'.", e) --- End diff -- typo: Could > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374571#comment-16374571 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170224845 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java --- @@ -83,10 +84,32 @@ protected JsonRowDeserializationSchema getDeserializationSchema() { @Override public String explainSource() { - return "KafkaJSONTableSource"; + return "KafkaJsonTableSource"; } - SETTERS FOR OPTIONAL PARAMETERS + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaJsonTableSource)) { + return false; + } + if (!super.equals(o)) { + return false; + } + KafkaJsonTableSource that = (KafkaJsonTableSource) o; + return failOnMissingField == that.failOnMissingField && + Objects.equals(jsonSchema, that.jsonSchema) && + Objects.equals(fieldMapping, that.fieldMapping); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, failOnMissingField); --- End diff -- `TableSchema` does not override `hashCode()` > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256878 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" + val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class" + val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized" --- End diff -- `rowtime.watermarks.custom.serialized`? ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170230028 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -155,6 +199,28 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** +* Adds an indexed sequence of properties (with sub-properties) under a common key. +* +* For example: +* +* schema.fields.0.type = INT, schema.fields.0.name = test +* schema.fields.1.type = LONG, schema.fields.1.name = test2 +* +* The arity of each propertyValue must match the arity of propertyKeys. +* +* This method is intended for Java code. +*/ + def putIndexedFixedProperties( --- End diff -- Remove Scala equivalent? ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170273904 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala --- @@ -47,53 +79,18 @@ class CsvTest extends DescriptorTestBase { "format.fields.3.name" -> "field4", "format.fields.3.type" -> "ROW(test INT, row VARCHAR)", --- End diff -- Shouldn't this fail because CSV does not support nested data? ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170274382 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** -* Returns a valid descriptor. +* Returns a set of valid descriptors. +* This method is implemented in both Scala and Java. --- End diff -- Why is this important? ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170298004 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** -* Returns a valid descriptor. +* Returns a set of valid descriptors. +* This method is implemented in both Scala and Java. +*/ + def descriptors(): java.util.List[Descriptor] + + /** +* Returns a set of properties for each valid descriptor. +* This code is implemented in both Scala and Java. */ - def descriptor(): Descriptor + def properties(): java.util.List[java.util.Map[String, String]] /** -* Returns a validator that can validate this descriptor. +* Returns a validator that can validate all valid descriptors. */ def validator(): DescriptorValidator - def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + @Test + def testValidation(): Unit = { +val d = descriptors().asScala +val p = properties().asScala + +Preconditions.checkArgument(d.length == p.length) + +d.zip(p).foreach { case (desc, props) => + verifyProperties(desc, props.asScala.toMap) +} + } + + def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = { val normProps = new DescriptorProperties descriptor.addProperties(normProps) -assertEquals(expected.toMap, normProps.asMap) +assertEquals(expected, normProps.asScalaMap) } - def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + def verifyInvalidProperty( --- End diff -- rename to `addPropertyAndVerify()` ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170232160 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -246,13 +394,93 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { Some(schemaBuilder.build()) } + /** +* Returns a table schema under the given key if it exists. +*/ + def getOptionalTableSchema(key: String): Optional[TableSchema] = toJava(getTableSchema(key)) + + /** +* Returns the type information under the given key if it exists. +*/ + def getType(key: String): Option[TypeInformation[_]] = { +properties.get(key).map(TypeStringUtils.readTypeInfo) + } + + /** +* Returns the type information under the given key if it exists. +* This method is intended for Java code. +*/ + def getOptionalType(key: String): Optional[TypeInformation[_]] = { +toJava(getType(key)) + } + + /** +* Returns a prefix subset of properties. +*/ + def getPrefix(prefixKey: String): Map[String, String] = { +val prefix = prefixKey + '.' +properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) => + k.substring(prefix.length) -> v // remove prefix +}.toMap + } + + /** +* Returns a prefix subset of properties. +* This method is intended for Java code. +*/ + def getPrefixMap(prefixKey: String): JMap[String, String] = getPrefix(prefixKey).asJava --- End diff -- I find the different names for methods that do the same confusing. I'd just remove the Scala methods. ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256073 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" --- End diff -- `rowtime.timestamps.custom.class` ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170272030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal object SchemaValidator { val SCHEMA = "schema" - val SCHEMA_VERSION = "schema.version" + val SCHEMA_PROPERTY_VERSION = "schema.property-version" + val SCHEMA_FIELDS = "schema.fields" + val SCHEMA_FIELDS_NAME = "name" + val SCHEMA_FIELDS_TYPE = "type" + val SCHEMA_FIELDS_PROCTIME = "proctime" + val SCHEMA_FIELDS_FROM = "from" + val SCHEMA_DERIVE_FIELDS = "schema.derive-fields" + val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically" + val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially" + + // utilities + + /** +* Derives a schema from properties and source. +*/ + def deriveSchema( + properties: DescriptorProperties, + sourceSchema: Option[TableSchema]) +: TableSchema = { + +val builder = TableSchema.builder() + +val schema = properties.getTableSchema(SCHEMA_FIELDS) + +val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS) + +val sourceNamesAndTypes = derivationMode match { + case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if sourceSchema.isDefined => +// sort by name +sourceSchema.get.getColumnNames + .zip(sourceSchema.get.getTypes) + .sortBy(_._1) + + case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if sourceSchema.isDefined => +sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes) + + case Some(_) => +throw new ValidationException("Derivation of fields is not supported from this source.") + + case None => +Array[(String, TypeInformation[_])]() +} + +// add source fields +sourceNamesAndTypes.foreach { case (n, t) => + builder.field(n, t) +} + +// add schema fields +schema.foreach { ts => + val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes) + schemaNamesAndTypes.foreach { case (n, t) => + // do not allow overwriting + if (sourceNamesAndTypes.exists(_._1 == n)) { +throw new ValidationException( + "Specified schema fields must not overwrite fields derived from the source.") + } + builder.field(n, t) + } +} + +builder.build() + } + + /** +* Derives a schema from properties and source. +* This method is intended for Java code. +*/ + def deriveSchema( + properties: DescriptorProperties, + sourceSchema: Optional[TableSchema]) +: TableSchema = { +deriveSchema( + properties, + Option(sourceSchema.orElse(null))) + } + + /** +* Finds the proctime attribute if defined. +*/ + def deriveProctimeAttribute(properties: DescriptorProperties): Option[String] = { +val names = properties.getIndexedProperty(SCHEMA_FIELDS, SCHEMA_FIELDS_NAME) + +for (i <- 0 until names.size) { + val isProctime = properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME") + isProctime.foreach { isSet => +if (isSet) { + return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME") +} + } +} +None + } + + /** +* Finds the proctime attribute if defined. +* This method is intended for Java code. +*/ + def deriveProctimeOptional(properties: DescriptorProperties): Optional[String] = { +Optional.ofNullable(deriveProctimeAttribute(properties).orNull) + } + + /** +* Finds the rowtime attributes if defined. +*/ + def deriveRowtimeAttributes(properties: DescriptorProperties) +: util.List[RowtimeAttributeDescriptor] = { + +val names = properties.getIndexedProperty(SCHEMA_FIELDS, SCHEMA_FIELDS_NAME) + +var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]() + +// check for rowtime in every field +for (i <- 0 until names.size) { + RowtimeValidator +.getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.") +.foreach { case (extractor, strategy) => + // create descriptor + attributes += new RowtimeAttributeDescriptor( + properties.getString(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME").get, +extractor, +strategy) +} +} + +
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170297941 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** -* Returns a valid descriptor. +* Returns a set of valid descriptors. +* This method is implemented in both Scala and Java. +*/ + def descriptors(): java.util.List[Descriptor] + + /** +* Returns a set of properties for each valid descriptor. +* This code is implemented in both Scala and Java. */ - def descriptor(): Descriptor + def properties(): java.util.List[java.util.Map[String, String]] /** -* Returns a validator that can validate this descriptor. +* Returns a validator that can validate all valid descriptors. */ def validator(): DescriptorValidator - def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + @Test + def testValidation(): Unit = { +val d = descriptors().asScala +val p = properties().asScala + +Preconditions.checkArgument(d.length == p.length) + +d.zip(p).foreach { case (desc, props) => + verifyProperties(desc, props.asScala.toMap) +} + } + + def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = { val normProps = new DescriptorProperties descriptor.addProperties(normProps) -assertEquals(expected.toMap, normProps.asMap) +assertEquals(expected, normProps.asScalaMap) } - def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + def verifyInvalidProperty( + descriptor: Descriptor, + property: String, + invalidValue: String): Unit = { val properties = new DescriptorProperties -descriptor().addProperties(properties) +descriptor.addProperties(properties) properties.unsafePut(property, invalidValue) validator().validate(properties) } - def verifyMissingProperty(removeProperty: String): Unit = { + def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = { --- End diff -- rename to `removePropertyAndVerify()` ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170229924 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala --- @@ -89,37 +105,58 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { put(key, clazz.getName) } + /** +* Adds a string under the given key. +*/ def putString(key: String, str: String): Unit = { checkNotNull(key) checkNotNull(str) put(key, str) } + /** +* Adds a boolean under the given key. +*/ def putBoolean(key: String, b: Boolean): Unit = { checkNotNull(key) put(key, b.toString) } + /** +* Adds a long under the given key. +*/ def putLong(key: String, l: Long): Unit = { checkNotNull(key) put(key, l.toString) } + /** +* Adds an integer under the given key. +*/ def putInt(key: String, i: Int): Unit = { checkNotNull(key) put(key, i.toString) } + /** +* Adds a character under the given key. +*/ def putCharacter(key: String, c: Character): Unit = { checkNotNull(key) checkNotNull(c) put(key, c.toString) } + /** +* Adds a table schema under the given key. +*/ def putTableSchema(key: String, schema: TableSchema): Unit = { putTableSchema(key, normalizeTableSchema(schema)) } + /** +* Adds a table schema under the given key. +*/ def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = { --- End diff -- Remove? ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256006 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" --- End diff -- `rowtime.timestamps.from.field` ---
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170256141 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala --- @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" --- End diff -- `rowtime.timestamps.custom.serialized` ---
[jira] [Updated] (FLINK-8737) Creating a union of UnionGate instances will fail with UnsupportedOperationException when retrieving buffers
[ https://issues.apache.org/jira/browse/FLINK-8737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8737: --- Priority: Blocker (was: Major) > Creating a union of UnionGate instances will fail with > UnsupportedOperationException when retrieving buffers > > > Key: FLINK-8737 > URL: https://issues.apache.org/jira/browse/FLINK-8737 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > FLINK-8589 introduced a new polling method but did not implement > {{UnionInputGate#pollNextBufferOrEvent()}}. This prevents UnionGate instances > from containing a UnionGate instance which is explicitly allowed by its > documentation and interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8736) Memory segment offsets for slices of slices are wrong
[ https://issues.apache.org/jira/browse/FLINK-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8736: --- Priority: Critical (was: Major) > Memory segment offsets for slices of slices are wrong > - > > Key: FLINK-8736 > URL: https://issues.apache.org/jira/browse/FLINK-8736 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > Fix For: 1.5.0 > > > FLINK-8588 introduced memory segment offsets but the offsets of slices of > slices do not account for their parent's slice offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available
[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8755: --- Priority: Blocker (was: Major) > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their buffers are not included in > the backlog counting and therefore, {{isMoreAvailable}} will be wrongly > {{false}} here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8751) Canceling a job results in a InterruptedException in the TM
[ https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374551#comment-16374551 ] Stephan Ewen commented on FLINK-8751: - Yes, that makes sense. The reason here is that when Flink cancels the user code, it interrupts the task thread (to pull the code out of blocking operations). When the thread has the interrupted flag, and successive wait that is part of graceful shutdown immediately throws an InterruptedException. I think the right fix is to clear the interrupted flag at the beginning of the graceful shutdown code (the beginning of the finally code). > Canceling a job results in a InterruptedException in the TM > --- > > Key: FLINK-8751 > URL: https://issues.apache.org/jira/browse/FLINK-8751 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.4.1 >Reporter: Elias Levy >Priority: Major > > Canceling a job results in the following exception reported by the TM: > {code:java} > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could > not shut down timer service java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown > Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8751) Canceling a job results in a InterruptedException in the TM
[ https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374525#comment-16374525 ] Elias Levy commented on FLINK-8751: --- Apologies. I meant TM. It was a long day. I've amended the issue. > Canceling a job results in a InterruptedException in the TM > --- > > Key: FLINK-8751 > URL: https://issues.apache.org/jira/browse/FLINK-8751 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.4.1 >Reporter: Elias Levy >Priority: Major > > Canceling a job results in the following exception reported by the TM: > {code:java} > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could > not shut down timer service java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown > Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8751) Canceling a job results in a InterruptedException in the TM
[ https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-8751: -- Description: Canceling a job results in the following exception reported by the TM: {code:java} ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down timer service java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown Source) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source){code} was: Canceling a job results in the following exception reported by the JM: {code:java} ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down timer service java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown Source) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source){code} Component/s: (was: JobManager) TaskManager Summary: Canceling a job results in a InterruptedException in the TM (was: Canceling a job results in a InterruptedException in the JM) > Canceling a job results in a InterruptedException in the TM > --- > > Key: FLINK-8751 > URL: https://issues.apache.org/jira/browse/FLINK-8751 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.4.1 >Reporter: Elias Levy >Priority: Major > > Canceling a job results in the following exception reported by the TM: > {code:java} > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could > not shut down timer service java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown > Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8760) Correctly set `moreAvailable` flag in SingleInputGate and UnionInputGate
[ https://issues.apache.org/jira/browse/FLINK-8760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-8760: -- Priority: Blocker (was: Major) > Correctly set `moreAvailable` flag in SingleInputGate and UnionInputGate > > > Key: FLINK-8760 > URL: https://issues.apache.org/jira/browse/FLINK-8760 > Project: Flink > Issue Type: Sub-task >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8694) Make notifyDataAvailable call reliable
[ https://issues.apache.org/jira/browse/FLINK-8694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-8694: -- Priority: Blocker (was: Major) > Make notifyDataAvailable call reliable > -- > > Key: FLINK-8694 > URL: https://issues.apache.org/jira/browse/FLINK-8694 > Project: Flink > Issue Type: Sub-task >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > > After FLINK-8591 > org.apache.flink.runtime.io.network.netty.SequenceNumberingViewReader#notifyDataAvailable > (and the same for Credit base flow control) due to race condition can be > sometimes ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8694) Make notifyDataAvailable call reliable
[ https://issues.apache.org/jira/browse/FLINK-8694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374496#comment-16374496 ] ASF GitHub Bot commented on FLINK-8694: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5572 [FLINK-8694][runtime] Fix notifyDataAvailable race condition This fixes two bugs in network stack: https://issues.apache.org/jira/browse/FLINK-8760 https://issues.apache.org/jira/browse/FLINK-8694 ## Brief change log Please check individual commit messages. ## Verifying this change This PR adds new tests covering the previously bugged cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8694-proper-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5572.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5572 commit 388d16118763dddff7d4c3593572169ad3e65c0d Author: Piotr NowojskiDate: 2018-02-23T10:37:37Z [hotfix][tests] Deduplicate code in SingleInputGateTest commit e22a44b24ab1e9f02c236440f899a1f4dfdfc873 Author: Piotr Nowojski Date: 2018-02-23T11:11:14Z [hotfix][runtime] Remove duplicated check commit 5c16e565c4a7f0ffdaec888696d98e3c2c221d99 Author: Piotr Nowojski Date: 2018-02-23T10:20:21Z [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable might incorrectly return false. This might caused some dead locks. commit a451006fd2e38e478ef745fd9de0ebc5fb2fd5c2 Author: Piotr Nowojski Date: 2018-02-23T10:27:54Z [hotfixu][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase commit e70cd04424f0f92b9d5127e7c4a351d3823d20bd Author: Piotr Nowojski Date: 2018-02-23T10:28:20Z [FLINK-8694][runtime] Fix notifyDataAvailable race condition Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls. This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test for flushAlways (without this fix this test is dead locking). > Make notifyDataAvailable call reliable > -- > > Key: FLINK-8694 > URL: https://issues.apache.org/jira/browse/FLINK-8694 > Project: Flink > Issue Type: Sub-task >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > > After FLINK-8591 > org.apache.flink.runtime.io.network.netty.SequenceNumberingViewReader#notifyDataAvailable > (and the same for Credit base flow control) due to race condition can be > sometimes ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5572 [FLINK-8694][runtime] Fix notifyDataAvailable race condition This fixes two bugs in network stack: https://issues.apache.org/jira/browse/FLINK-8760 https://issues.apache.org/jira/browse/FLINK-8694 ## Brief change log Please check individual commit messages. ## Verifying this change This PR adds new tests covering the previously bugged cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8694-proper-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5572.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5572 commit 388d16118763dddff7d4c3593572169ad3e65c0d Author: Piotr NowojskiDate: 2018-02-23T10:37:37Z [hotfix][tests] Deduplicate code in SingleInputGateTest commit e22a44b24ab1e9f02c236440f899a1f4dfdfc873 Author: Piotr Nowojski Date: 2018-02-23T11:11:14Z [hotfix][runtime] Remove duplicated check commit 5c16e565c4a7f0ffdaec888696d98e3c2c221d99 Author: Piotr Nowojski Date: 2018-02-23T10:20:21Z [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable might incorrectly return false. This might caused some dead locks. commit a451006fd2e38e478ef745fd9de0ebc5fb2fd5c2 Author: Piotr Nowojski Date: 2018-02-23T10:27:54Z [hotfixu][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase commit e70cd04424f0f92b9d5127e7c4a351d3823d20bd Author: Piotr Nowojski Date: 2018-02-23T10:28:20Z [FLINK-8694][runtime] Fix notifyDataAvailable race condition Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls. This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test for flushAlways (without this fix this test is dead locking). ---
[jira] [Updated] (FLINK-8759) Bump Netty to 4.0.56
[ https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8759: - Affects Version/s: 1.5.0 > Bump Netty to 4.0.56 > > > Key: FLINK-8759 > URL: https://issues.apache.org/jira/browse/FLINK-8759 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0 > > > For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd > like to bump the version (and stay within the 4.0 series for now). The > problem we faced in the past should not be relevant for credit-based flow > control anymore and can be worked around (for the fallback code path) by > restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying > contents to new buffers instead of slicing the existing one (please refer to > FLINK-7428 for the inverse direction). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8759) Bump Netty to 4.0.56
[ https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8759: - Priority: Blocker (was: Major) > Bump Netty to 4.0.56 > > > Key: FLINK-8759 > URL: https://issues.apache.org/jira/browse/FLINK-8759 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd > like to bump the version (and stay within the 4.0 series for now). The > problem we faced in the past should not be relevant for credit-based flow > control anymore and can be worked around (for the fallback code path) by > restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying > contents to new buffers instead of slicing the existing one (please refer to > FLINK-7428 for the inverse direction). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8759) Bump Netty to 4.0.56
[ https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374480#comment-16374480 ] ASF GitHub Bot commented on FLINK-8759: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5571 [FLINK-8759][network] preparations for the update of netty to version 4.0.56 ## What is the purpose of the change Based on the changes from #5570, this PR prepares a netty version bump by circumventing a bug we had with the change from Netty 4.0.27 to 4.0.28 with the old non-credit based flow control paths: Versions >= 4.0.28 contain an improvement by Netty, which slices a Netty buffer instead of doing a memory copy (https://github.com/netty/netty/issues/3704) in the `LengthFieldBasedFrameDecoder`. In some situations, this interacts badly with our Netty pipeline leading to `OutOfMemory` errors. With credit-based flow control this problem should not exist anymore which is why we can use the original netty code there. ## Brief change log - override `LengthFieldBasedFrameDecoder#extractFrame()` and implement two different code paths depending on whether credit based flow control is on or not ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** (only per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8759 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5571.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5571 commit 272899e01c2836a2a5b47958db7d9d9f6cbf471d Author: Nico KruberDate: 2018-02-23T12:56:29Z [FLINK-8768][network] Let NettyMessageDecoder inherit from LengthFieldBasedFrameDecoder This replaces one additional step from the pipeline and does not only remove overhead there but also allows use to override the #extractFrame() method to restore the old Netty 4.0.27 behaviour for non-credit based code paths which had a bug with Netty >= 4.0.28 there (see FLINK-8759). commit 11f673b415bce310cc2195fb6778051c095083fa Author: Nico Kruber Date: 2018-02-23T13:06:00Z [FLINK-8759][network] preparations for the update of netty to version 4.0.56 > Bump Netty to 4.0.56 > > > Key: FLINK-8759 > URL: https://issues.apache.org/jira/browse/FLINK-8759 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0 > > > For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd > like to bump the version (and stay within the 4.0 series for now). The > problem we faced in the past should not be relevant for credit-based flow > control anymore and can be worked around (for the fallback code path) by > restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying > contents to new buffers instead of slicing the existing one (please refer to > FLINK-7428 for the inverse direction). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5571: [FLINK-8759][network] preparations for the update ...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5571 [FLINK-8759][network] preparations for the update of netty to version 4.0.56 ## What is the purpose of the change Based on the changes from #5570, this PR prepares a netty version bump by circumventing a bug we had with the change from Netty 4.0.27 to 4.0.28 with the old non-credit based flow control paths: Versions >= 4.0.28 contain an improvement by Netty, which slices a Netty buffer instead of doing a memory copy (https://github.com/netty/netty/issues/3704) in the `LengthFieldBasedFrameDecoder`. In some situations, this interacts badly with our Netty pipeline leading to `OutOfMemory` errors. With credit-based flow control this problem should not exist anymore which is why we can use the original netty code there. ## Brief change log - override `LengthFieldBasedFrameDecoder#extractFrame()` and implement two different code paths depending on whether credit based flow control is on or not ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** (only per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8759 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5571.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5571 commit 272899e01c2836a2a5b47958db7d9d9f6cbf471d Author: Nico KruberDate: 2018-02-23T12:56:29Z [FLINK-8768][network] Let NettyMessageDecoder inherit from LengthFieldBasedFrameDecoder This replaces one additional step from the pipeline and does not only remove overhead there but also allows use to override the #extractFrame() method to restore the old Netty 4.0.27 behaviour for non-credit based code paths which had a bug with Netty >= 4.0.28 there (see FLINK-8759). commit 11f673b415bce310cc2195fb6778051c095083fa Author: Nico Kruber Date: 2018-02-23T13:06:00Z [FLINK-8759][network] preparations for the update of netty to version 4.0.56 ---
[jira] [Commented] (FLINK-8725) Separate NFA-state from NFA in CEP library
[ https://issues.apache.org/jira/browse/FLINK-8725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374474#comment-16374474 ] ASF GitHub Bot commented on FLINK-8725: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5559 @aljoscha I will have a look today. > Separate NFA-state from NFA in CEP library > -- > > Key: FLINK-8725 > URL: https://issues.apache.org/jira/browse/FLINK-8725 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.4.0, 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > > The CEP library currently serialises the static parts of the NFA in the state > for each key. This is wasteful, because that part is static and problematic > because the static part can contain user code in the form of filter functions. > > We should only serialise the dynamic state of the NFA (current states, seen > elements). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5559: [FLINK-8725] Separate state from NFA in CEP library
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5559 @aljoscha I will have a look today. ---
[jira] [Commented] (FLINK-8725) Separate NFA-state from NFA in CEP library
[ https://issues.apache.org/jira/browse/FLINK-8725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374471#comment-16374471 ] ASF GitHub Bot commented on FLINK-8725: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5559 R: @kl0u > Separate NFA-state from NFA in CEP library > -- > > Key: FLINK-8725 > URL: https://issues.apache.org/jira/browse/FLINK-8725 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.4.0, 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > > The CEP library currently serialises the static parts of the NFA in the state > for each key. This is wasteful, because that part is static and problematic > because the static part can contain user code in the form of filter functions. > > We should only serialise the dynamic state of the NFA (current states, seen > elements). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5559: [FLINK-8725] Separate state from NFA in CEP library
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5559 R: @kl0u ---
[jira] [Commented] (FLINK-8768) Change NettyMessageDecoder to inherit from LengthFieldBasedFrameDecoder
[ https://issues.apache.org/jira/browse/FLINK-8768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374464#comment-16374464 ] ASF GitHub Bot commented on FLINK-8768: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5570 [FLINK-8768][network] Let NettyMessageDecoder inherit from LengthFieldBasedFrameDecoder ## What is the purpose of the change Instead of being two steps in the channel pipeline, `NettyMessageDecoder` could derive from `LengthFieldBasedFrameDecoder` to reduce overhead and give us more control over the protocol. As a first step, we will use this to override the `#extractFrame()` method to restore the old Netty 4.0.27 behaviour for non-credit based code paths which had a bug with Netty >= 4.0.28 (see FLINK-8759). ## Brief change log - make `NettyMessageDecoder` inherit from `LengthFieldBasedFrameDecoder` (beware that this changes the decoder from a `MessageToMessageDecoder` to a `ByteToMessageDecoder` with different cleanup invariants!) ## Verifying this change This change is already covered by existing tests, such as `NettyMessageSerializationTest` or other network tests using the encoding/decoding pipeline. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** (only per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8768 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5570 commit 272899e01c2836a2a5b47958db7d9d9f6cbf471d Author: Nico KruberDate: 2018-02-23T12:56:29Z [FLINK-8768][network] Let NettyMessageDecoder inherit from LengthFieldBasedFrameDecoder This replaces one additional step from the pipeline and does not only remove overhead there but also allows use to override the #extractFrame() method to restore the old Netty 4.0.27 behaviour for non-credit based code paths which had a bug with Netty >= 4.0.28 there (see FLINK-8759). > Change NettyMessageDecoder to inherit from LengthFieldBasedFrameDecoder > --- > > Key: FLINK-8768 > URL: https://issues.apache.org/jira/browse/FLINK-8768 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0 > > > Let {{NettyMessageDecoder}} inherit from {{LengthFieldBasedFrameDecoder}} > instead of being an additional step in the pipeline. This does not only > remove overhead in the pipeline itself but also allows use to override the > {{#extractFrame()}} method to restore the old Netty 4.0.27 behaviour for > non-credit based code paths which had a bug with Netty >= 4.0.28 there. -- This message was sent by Atlassian JIRA (v7.6.3#76005)