[jira] [Closed] (FLINK-9818) Add cluster component command line parser
[ https://issues.apache.org/jira/browse/FLINK-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9818. Resolution: Fixed Fixed via ab9bd87e521d19db7c7d783268a3532d2e876a5d > Add cluster component command line parser > - > > Key: FLINK-9818 > URL: https://issues.apache.org/jira/browse/FLINK-9818 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In order to parse command line options for the cluster components > ({{TaskManagerRunner}}, {{ClusterEntrypoints}}), we should add a > {{CommandLineParser}} which supports the common command line options > ({{--configDir}}, {{--webui-port}} and dynamic properties which can override > configuration values). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7087) Implement Flip-6 container entry point
[ https://issues.apache.org/jira/browse/FLINK-7087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7087: Assignee: Till Rohrmann > Implement Flip-6 container entry point > -- > > Key: FLINK-7087 > URL: https://issues.apache.org/jira/browse/FLINK-7087 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.6.0 > > > In order to support Docker and K8 we have to provide a container entry point. > In a first version, the container entry point could be similar to the > standalone session mode just with the difference that we don't submit a job > to the cluster. The job has to be contained in the container image or being > at least retrievable by the entry point. In that sense the container entry > point acts like a per-job standalone mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-7087) Implement Flip-6 container entry point
[ https://issues.apache.org/jira/browse/FLINK-7087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7087. Resolution: Fixed Fixed via 8f467c1e9727d5a86d38d0b49753c534a1a161da > Implement Flip-6 container entry point > -- > > Key: FLINK-7087 > URL: https://issues.apache.org/jira/browse/FLINK-7087 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.6.0 > > > In order to support Docker and K8 we have to provide a container entry point. > In a first version, the container entry point could be similar to the > standalone session mode just with the difference that we don't submit a job > to the cluster. The job has to be contained in the container image or being > at least retrievable by the entry point. In that sense the container entry > point acts like a per-job standalone mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9488) Create common entry point for master and workers
[ https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9488. Resolution: Fixed Fixed via 8f467c1e9727d5a86d38d0b49753c534a1a161da > Create common entry point for master and workers > > > Key: FLINK-9488 > URL: https://issues.apache.org/jira/browse/FLINK-9488 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > To make the container setup easier, we should provide a single cluster entry > point which uses leader election to become either the master or a worker > which runs the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9821) Let dynamic properties overwrite configuration settings in TaskManagerRunner
[ https://issues.apache.org/jira/browse/FLINK-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9821. Resolution: Fixed Fixed via 740f2fbf2e65fa988c6a577989ccd8923729be45 > Let dynamic properties overwrite configuration settings in TaskManagerRunner > > > Key: FLINK-9821 > URL: https://issues.apache.org/jira/browse/FLINK-9821 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-9820 we should also allow dynamic properties to overwrite > configuration values in the {{TaskManagerRunner}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9819) Create start up scripts for the StandaloneJobClusterEntryPoint
[ https://issues.apache.org/jira/browse/FLINK-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9819. Resolution: Fixed Fixed via 5a4bdf2c9fd1693ad3b90dbbd3bcb589ed15c101 > Create start up scripts for the StandaloneJobClusterEntryPoint > -- > > Key: FLINK-9819 > URL: https://issues.apache.org/jira/browse/FLINK-9819 > Project: Flink > Issue Type: New Feature > Components: Startup Shell Scripts >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In order to start the {{StandaloneJobClusterEntryPoint}} we need start up > scripts in {{flink-dist}}. We should extend the {{flink-daemon.sh}} and the > {{flink-console.sh}} scripts to support this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9820) Let dynamic properties overwrite configuration settings in ClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-9820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9820. Resolution: Fixed Fixed via 2fbbf8ee662647c71581f5cd989226be820fed0f > Let dynamic properties overwrite configuration settings in ClusterEntrypoint > > > Key: FLINK-9820 > URL: https://issues.apache.org/jira/browse/FLINK-9820 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The dynamic properties which are passed to the {{ClusterEntrypoint}} should > overwrite values in the loaded {{Configuration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image
[ https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9822. Resolution: Fixed Fixed via 56e5381cb7aba01f1d7ecfa11e4be7f505a35baf > Add Dockerfile for StandaloneJobClusterEntryPoint image > --- > > Key: FLINK-9822 > URL: https://issues.apache.org/jira/browse/FLINK-9822 > Project: Flink > Issue Type: New Feature > Components: Docker >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Add a {{Dockerfile}} to create an image which contains the > {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The > entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} > with the added user code jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9488) Create common entry point for master and workers
[ https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544050#comment-16544050 ] ASF GitHub Bot commented on FLINK-9488: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6315 > Create common entry point for master and workers > > > Key: FLINK-9488 > URL: https://issues.apache.org/jira/browse/FLINK-9488 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > To make the container setup easier, we should provide a single cluster entry > point which uses leader election to become either the master or a worker > which runs the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9619. -- Resolution: Fixed Fix Version/s: (was: 1.5.2) Fixed via c9ad0a07ef0339ced74057fc17800ca9ab7784c1 > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.1, 1.6.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544053#comment-16544053 ] ASF GitHub Bot commented on FLINK-9619: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6185 > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.1, 1.6.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6317: [FLINK-9820] Forward dynamic properties to Flink c...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6317 ---
[jira] [Commented] (FLINK-9821) Let dynamic properties overwrite configuration settings in TaskManagerRunner
[ https://issues.apache.org/jira/browse/FLINK-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544052#comment-16544052 ] ASF GitHub Bot commented on FLINK-9821: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6318 > Let dynamic properties overwrite configuration settings in TaskManagerRunner > > > Key: FLINK-9821 > URL: https://issues.apache.org/jira/browse/FLINK-9821 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-9820 we should also allow dynamic properties to overwrite > configuration values in the {{TaskManagerRunner}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6331: [FLINK-9701] [state] (follow up) Use StateTtlConfi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6331 ---
[GitHub] flink pull request #6318: [FLINK-9821] Forward dynamic properties to configu...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6318 ---
[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6319 ---
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544056#comment-16544056 ] ASF GitHub Bot commented on FLINK-9143: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6283 > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9823) Add Kubernetes deployment files for standalone job cluster
[ https://issues.apache.org/jira/browse/FLINK-9823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9823. Resolution: Fixed Fixed via 387a3bc198cfea016abd92953c7fce28e641cf67 > Add Kubernetes deployment files for standalone job cluster > -- > > Key: FLINK-9823 > URL: https://issues.apache.org/jira/browse/FLINK-9823 > Project: Flink > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-9822, it would be helpful for the user to have example > Kubernetes deployment files to start a standalone job cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6314: [FLINK-9818] Add cluster component command line pa...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6314 ---
[jira] [Commented] (FLINK-9818) Add cluster component command line parser
[ https://issues.apache.org/jira/browse/FLINK-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544048#comment-16544048 ] ASF GitHub Bot commented on FLINK-9818: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6314 > Add cluster component command line parser > - > > Key: FLINK-9818 > URL: https://issues.apache.org/jira/browse/FLINK-9818 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In order to parse command line options for the cluster components > ({{TaskManagerRunner}}, {{ClusterEntrypoints}}), we should add a > {{CommandLineParser}} which supports the common command line options > ({{--configDir}}, {{--webui-port}} and dynamic properties which can override > configuration values). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6320 ---
[GitHub] flink pull request #6316: [FLINK-9819] Add startup scripts for standalone jo...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6316 ---
[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image
[ https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544055#comment-16544055 ] ASF GitHub Bot commented on FLINK-9822: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6319 > Add Dockerfile for StandaloneJobClusterEntryPoint image > --- > > Key: FLINK-9822 > URL: https://issues.apache.org/jira/browse/FLINK-9822 > Project: Flink > Issue Type: New Feature > Components: Docker >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Add a {{Dockerfile}} to create an image which contains the > {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The > entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} > with the added user code jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9619: -- Labels: pull-request-available (was: ) > Always close the task manager connection when the container is completed in > YarnResourceManager > --- > > Key: FLINK-9619 > URL: https://issues.apache.org/jira/browse/FLINK-9619 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.1, 1.6.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > We should always eagerly close the connection with task manager when the > container is completed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6283 ---
[jira] [Commented] (FLINK-9820) Let dynamic properties overwrite configuration settings in ClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-9820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544051#comment-16544051 ] ASF GitHub Bot commented on FLINK-9820: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6317 > Let dynamic properties overwrite configuration settings in ClusterEntrypoint > > > Key: FLINK-9820 > URL: https://issues.apache.org/jira/browse/FLINK-9820 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The dynamic properties which are passed to the {{ClusterEntrypoint}} should > overwrite values in the loaded {{Configuration}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9823) Add Kubernetes deployment files for standalone job cluster
[ https://issues.apache.org/jira/browse/FLINK-9823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544046#comment-16544046 ] ASF GitHub Bot commented on FLINK-9823: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6320 > Add Kubernetes deployment files for standalone job cluster > -- > > Key: FLINK-9823 > URL: https://issues.apache.org/jira/browse/FLINK-9823 > Project: Flink > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-9822, it would be helpful for the user to have example > Kubernetes deployment files to start a standalone job cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6185: [FLINK-9619][YARN] Eagerly close the connection wi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6185 ---
[GitHub] flink pull request #6315: [FLINK-9488] Add container entry point StandaloneJ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6315 ---
[jira] [Commented] (FLINK-9819) Create start up scripts for the StandaloneJobClusterEntryPoint
[ https://issues.apache.org/jira/browse/FLINK-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544049#comment-16544049 ] ASF GitHub Bot commented on FLINK-9819: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6316 > Create start up scripts for the StandaloneJobClusterEntryPoint > -- > > Key: FLINK-9819 > URL: https://issues.apache.org/jira/browse/FLINK-9819 > Project: Flink > Issue Type: New Feature > Components: Startup Shell Scripts >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In order to start the {{StandaloneJobClusterEntryPoint}} we need start up > scripts in {{flink-dist}}. We should extend the {{flink-daemon.sh}} and the > {{flink-console.sh}} scripts to support this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544054#comment-16544054 ] ASF GitHub Bot commented on FLINK-9701: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6331 > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9701. -- Resolution: Fixed Fix Version/s: 1.6.0 Fixed via 1632681e41cbc1092a6b4d47a58adfffba6af5d4 > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9143. -- Resolution: Fixed Fix Version/s: 1.6.0 Fixed via 57872d53c4584faace6dc8e4038ad1f2d068a453 > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-9143: Assignee: Dawid Wysakowicz (was: yuqi) > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-9701: -- > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544040#comment-16544040 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202507057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { - case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { -case Some(_: StreamTableSourceTable[_]) => true -case Some(_: BatchTableSourceTable[_]) => false -case _ => throw TableException(s"Unknown Table type ${t.getClass}.") - } - case t => throw TableException(s"Unknown Table type ${t.getClass}.") + case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true + // null --- End diff -- This information is useful. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn
[ https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9762: --- Assignee: (was: vinoyang) > CoreOptions.TMP_DIRS wrongly managed on Yarn > > > Key: FLINK-9762 > URL: https://issues.apache.org/jira/browse/FLINK-9762 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Oleksandr Nitavskyi >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > The issue on Yarn is that it is impossible to have different LOCAL_DIRS on > JobManager and TaskManager, despite LOCAL_DIRS value depends on the container. > The issue is that CoreOptions.TMP_DIRS is configured to the default value > during JobManager initialization and added to the configuration object. When > TaskManager is launched the appropriate configuration object is cloned with > LOCAL_DIRS which makes sense only for Job Manager container. When YARN > container with TaskManager from his point of view CoreOptions.TMP_DIRS is > always equal either to path in flink.yml or to the or to the LOCAL_DIRS of > Job Manager (default behaviour). Is TaskManager’s container do not have an > access to another folders, that folders allocated by YARN TaskManager cannot > be started. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202507057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { - case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { -case Some(_: StreamTableSourceTable[_]) => true -case Some(_: BatchTableSourceTable[_]) => false -case _ => throw TableException(s"Unknown Table type ${t.getClass}.") - } - case t => throw TableException(s"Unknown Table type ${t.getClass}.") + case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true + // null --- End diff -- This information is useful. ---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506906 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T], + propertyMap: JMap[String, String], + classLoader: ClassLoader) +: T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) +
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544038#comment-16544038 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506906 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T],
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544037#comment-16544037 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { --- End diff -- the class name use singular looks better to me > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544033#comment-16544033 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506694 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration --- End diff -- the description of the param and throws do not need linefeed > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544031#comment-16544031 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505334 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration +* @return The SSLContext object which can be used by the ssl transport server +* Returns null if SSL is disabled +* @throws Exception +* Thrown if there is any misconfiguration +*/ + @Nullable + public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception { + --- End diff -- this empty line is useless, can be removed > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544030#comment-16544030 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505321 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java --- @@ -160,6 +160,7 @@ private Configuration createSslConfig() throws Exception { flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); +// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL"); --- End diff -- if this is a useless dead code, can be removed > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544036#comment-16544036 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { --- End diff -- provide a constructor like `SSLProvider(String provider)` to give the enum's string representation looks better than hard code. > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544034#comment-16544034 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544032#comment-16544032 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544035#comment-16544035 ] ASF GitHub Bot commented on FLINK-9816: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; --- End diff -- mark these fields as `private` as provide `getter/setter` looks better to me > Support Netty SslEngine based on openSSL > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(trustStore); - clientSSLContext = SSLContext.getInstance(sslProtocolVersion); -
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; --- End diff -- mark these fields as `private` as provide `getter/setter` looks better to me ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505334 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration +* @return The SSLContext object which can be used by the ssl transport server +* Returns null if SSL is disabled +* @throws Exception +* Thrown if there is any misconfiguration +*/ + @Nullable + public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception { + --- End diff -- this empty line is useless, can be removed ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { --- End diff -- provide a constructor like `SSLProvider(String provider)` to give the enum's string representation looks better than hard code. ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { --- End diff -- the class name use singular looks better to me ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506709 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -163,80 +163,157 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s } /** -* Creates the SSL Context for the client if SSL is configured. +* SSL engine provider. +*/ + public enum SSLProvider { + JDK, + /** +* OpenSSL with fallback to JDK if not available. +*/ + OPENSSL; + + public static SSLProvider fromString(String value) { + Preconditions.checkNotNull(value); + if (value.equalsIgnoreCase("OPENSSL")) { + return OPENSSL; + } else if (value.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + value); + } + } + } + + /** +* Instances needed to set up an SSL client connection. +*/ + public static class SSLClientTools { + public final SSLProvider preferredSslProvider; + public final String sslProtocolVersion; + public final TrustManagerFactory trustManagerFactory; + + public SSLClientTools( + SSLProvider preferredSslProvider, + String sslProtocolVersion, + TrustManagerFactory trustManagerFactory) { + this.preferredSslProvider = preferredSslProvider; + this.sslProtocolVersion = sslProtocolVersion; + this.trustManagerFactory = trustManagerFactory; + } + } + + /** +* Creates necessary helper objects to use for creating an SSL Context for the client if SSL is +* configured. * * @param sslConfig *The application configuration -* @return The SSLContext object which can be used by the ssl transport client -* Returns null if SSL is disabled +* @return The SSLClientTools object which can be used for creating some SSL context object; +* returns null if SSL is disabled. * @throws Exception * Thrown if there is any misconfiguration */ @Nullable - public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { - + public static SSLClientTools createSSLClientTools(Configuration sslConfig) throws Exception { Preconditions.checkNotNull(sslConfig); - SSLContext clientSSLContext = null; if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); + SSLProvider sslProvider = SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER)); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - FileInputStream trustStoreFile = null; - try { - trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) { trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); - } finally { - if (trustStoreFile != null) { - trustStoreFile.close(); - } } TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(trustStore); - clientSSLContext = SSLContext.getInstance(sslProtocolVersion); -
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202506694 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -249,14 +326,40 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws // Set up key manager factory to use the server key store KeyManagerFactory kmf = KeyManagerFactory.getInstance( - KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, certPassword.toCharArray()); + return new SSLServerTools(sslProvider, sslProtocolVersion, sslCipherSuites, kmf); + } + + return null; + } + + /** +* Creates the SSL Context for the server if SSL is configured. +* +* @param sslConfig +*The application configuration --- End diff -- the description of the param and throws do not need linefeed ---
[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6328#discussion_r202505321 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java --- @@ -160,6 +160,7 @@ private Configuration createSslConfig() throws Exception { flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); +// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL"); --- End diff -- if this is a useless dead code, can be removed ---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506861 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T], + propertyMap: JMap[String, String], + classLoader: ClassLoader) +: T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) +
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544028#comment-16544028 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506861 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T],
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544024#comment-16544024 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T],
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T], + propertyMap: JMap[String, String], + classLoader: ClassLoader) +: T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) +
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544023#comment-16544023 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506740 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) --- End diff -- The variable is only passed one time. The internal method is not checking for null gain. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506740 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) --- End diff -- The variable is only passed one time. The internal method is not checking for null gain. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544019#comment-16544019 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506661 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -143,118 +143,82 @@ case class CatalogAlreadyExistException( } /** - * Exception for not finding a [[TableFormatFactory]] for the - * given properties. + * Exception for not finding a [[TableFactory]] for the given properties. * * @param message message that indicates the current matching step * @param factoryClass required factory class - * @param formatFactories all found factories - * @param properties properties that describe the table format + * @param factories all found factories + * @param properties properties that describe the configuration * @param cause the cause */ -case class NoMatchingTableFormatException( +case class NoMatchingTableFactoryException( message: String, factoryClass: Class[_], - formatFactories: Seq[TableFormatFactory[_]], + factories: Seq[TableFactory], properties: Map[String, String], cause: Throwable) extends RuntimeException( --- End diff -- So far we don't have an inheritance of exceptions. Case classes don't support that in Scala so we would need to convert them. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506661 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -143,118 +143,82 @@ case class CatalogAlreadyExistException( } /** - * Exception for not finding a [[TableFormatFactory]] for the - * given properties. + * Exception for not finding a [[TableFactory]] for the given properties. * * @param message message that indicates the current matching step * @param factoryClass required factory class - * @param formatFactories all found factories - * @param properties properties that describe the table format + * @param factories all found factories + * @param properties properties that describe the configuration * @param cause the cause */ -case class NoMatchingTableFormatException( +case class NoMatchingTableFactoryException( message: String, factoryClass: Class[_], - formatFactories: Seq[TableFormatFactory[_]], + factories: Seq[TableFactory], properties: Map[String, String], cause: Throwable) extends RuntimeException( --- End diff -- So far we don't have an inheritance of exceptions. Case classes don't support that in Scala so we would need to convert them. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544018#comment-16544018 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala --- @@ -16,42 +16,17 @@ * limitations under the License. */ -package org.apache.flink.table.formats +package org.apache.flink.table.factories import java.util -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} - /** - * A factory to create different table format instances. This factory is used with Java's Service - * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized - * properties that describe the desired format. The factory allows for matching to the given set of - * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for - * creating configured instances of format classes accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in - * the current classpath to be found. + * A factory to create configured table format instances based on string-based properties. See + * also [[TableFactory]] for more information. * * @tparam T record type that the format produces or consumes */ -trait TableFormatFactory[T] { - - /** -* Specifies the context that this factory has been implemented for. The framework guarantees -* to only use the factory if the specified set of properties and values are met. -* -* Typical properties might be: -* - format.type -* - format.version -* -* Specified property versions allow the framework to provide backwards compatible properties -* in case of string format changes: -* - format.property-version -* -* An empty context means that the factory matches for all requests. -*/ - def requiredContext(): util.Map[String, String] +trait TableFormatFactory[T] extends TableFactory { --- End diff -- Because the Java comment explains the specific situation how supported format properties are handled. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala --- @@ -16,42 +16,17 @@ * limitations under the License. */ -package org.apache.flink.table.formats +package org.apache.flink.table.factories import java.util -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} - /** - * A factory to create different table format instances. This factory is used with Java's Service - * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized - * properties that describe the desired format. The factory allows for matching to the given set of - * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for - * creating configured instances of format classes accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in - * the current classpath to be found. + * A factory to create configured table format instances based on string-based properties. See + * also [[TableFactory]] for more information. * * @tparam T record type that the format produces or consumes */ -trait TableFormatFactory[T] { - - /** -* Specifies the context that this factory has been implemented for. The framework guarantees -* to only use the factory if the specified set of properties and values are met. -* -* Typical properties might be: -* - format.type -* - format.version -* -* Specified property versions allow the framework to provide backwards compatible properties -* in case of string format changes: -* - format.property-version -* -* An empty context means that the factory matches for all requests. -*/ - def requiredContext(): util.Map[String, String] +trait TableFormatFactory[T] extends TableFactory { --- End diff -- Because the Java comment explains the specific situation how supported format properties are handled. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544017#comment-16544017 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging { val properties = new DescriptorProperties() externalCatalogTable.addProperties(properties) val javaMap = properties.asMap -val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap) - .asInstanceOf[TableSourceFactory[_]] - .createTableSource(javaMap) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => -source match { - case bts: BatchTableSource[_] => -new TableSourceSinkTable(Some(new BatchTableSourceTable( - bts, - new FlinkStatistic(externalCatalogTable.getTableStats))), None) - case _ => throw new TableException( -s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + - s"in a batch environment.") -} +val source = TableFactoryService --- End diff -- Usually it is very uncommon to define both a batch and streaming source in the same factory. Your proposed change would require all future sources to implement a check for the environment before which is unnecessary in 80% of the cases. Separating by environment is a concept that can be find throughout the entire `flink-table` module because both sources/sinks behave quite different. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging { val properties = new DescriptorProperties() externalCatalogTable.addProperties(properties) val javaMap = properties.asMap -val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap) - .asInstanceOf[TableSourceFactory[_]] - .createTableSource(javaMap) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => -source match { - case bts: BatchTableSource[_] => -new TableSourceSinkTable(Some(new BatchTableSourceTable( - bts, - new FlinkStatistic(externalCatalogTable.getTableStats))), None) - case _ => throw new TableException( -s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + - s"in a batch environment.") -} +val source = TableFactoryService --- End diff -- Usually it is very uncommon to define both a batch and streaming source in the same factory. Your proposed change would require all future sources to implement a check for the environment before which is unnecessary in 80% of the cases. Separating by environment is a concept that can be find throughout the entire `flink-table` module because both sources/sinks behave quite different. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544016#comment-16544016 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506512 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java --- @@ -21,8 +21,12 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08; /** - * Tests for {@link Kafka08JsonTableSourceFactory}. + * Tests for legacy Kafka08JsonTableSourceFactory. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sources. */ +@Deprecated --- End diff -- No but we forgot it in the previous commit. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506512 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java --- @@ -21,8 +21,12 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08; /** - * Tests for {@link Kafka08JsonTableSourceFactory}. + * Tests for legacy Kafka08JsonTableSourceFactory. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sources. */ +@Deprecated --- End diff -- No but we forgot it in the previous commit. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544015#comment-16544015 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506494 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java --- @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) { } public Source toSource() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); --- End diff -- The table type is a concept of the SQL Client and should not be part of the table descriptor. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506494 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java --- @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) { } public Source toSource() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); --- End diff -- The table type is a concept of the SQL Client and should not be part of the table descriptor. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544007#comment-16544007 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala --- @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.connectors +package org.apache.flink.table.factories import java.util /** * Common trait for all properties-based discoverable table factories. */ -trait DiscoverableTableFactory { +trait TableFactory { --- End diff -- Actually, I would like the very generic name `Factory` but since we have to add some prefix to make it unique in the project, I named it `TableFactory` because we prefix everything in this module with `Table`. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala --- @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.connectors +package org.apache.flink.table.factories import java.util /** * Common trait for all properties-based discoverable table factories. */ -trait DiscoverableTableFactory { +trait TableFactory { --- End diff -- Actually, I would like the very generic name `Factory` but since we have to add some prefix to make it unique in the project, I named it `TableFactory` because we prefix everything in this module with `Table`. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544004#comment-16544004 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506126 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -54,51 +56,105 @@ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { +public abstract class KafkaTableSource implements + StreamTableSource, + DefinedProctimeAttribute, + DefinedRowtimeAttributes, + DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; + + // Kafka-specific attributes + /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; - /** Type information describing the result type. */ - private TypeInformation returnType; - - /** Field name of the processing time attribute, null if no processing time field is defined. */ - private String proctimeAttribute; - - /** Descriptor for a rowtime attribute. */ - private List rowtimeAttributeDescriptors; + /** Deserialization schema for decoding records from Kafka. */ + private final DeserializationSchema deserializationSchema; /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + private StartupMode startupMode; /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ private Map specificStartupOffsets; /** * Creates a generic Kafka {@link StreamTableSource}. * -* @param topic Kafka topic to consume. -* @param propertiesProperties for the Kafka consumer. -* @param schemaSchema of the produced table. -* @param returnTypeType information of the produced physical DataStream. +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no +*processing time field is defined. +* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute +* @param fieldMappingMapping for the fields of the table schema to --- End diff -- Backward compatibility. It could have been null in the past. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506126 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -54,51 +56,105 @@ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { +public abstract class KafkaTableSource implements + StreamTableSource, + DefinedProctimeAttribute, + DefinedRowtimeAttributes, + DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; + + // Kafka-specific attributes + /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; - /** Type information describing the result type. */ - private TypeInformation returnType; - - /** Field name of the processing time attribute, null if no processing time field is defined. */ - private String proctimeAttribute; - - /** Descriptor for a rowtime attribute. */ - private List rowtimeAttributeDescriptors; + /** Deserialization schema for decoding records from Kafka. */ + private final DeserializationSchema deserializationSchema; /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + private StartupMode startupMode; /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ private Map specificStartupOffsets; /** * Creates a generic Kafka {@link StreamTableSource}. * -* @param topic Kafka topic to consume. -* @param propertiesProperties for the Kafka consumer. -* @param schemaSchema of the produced table. -* @param returnTypeType information of the produced physical DataStream. +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no +*processing time field is defined. +* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute +* @param fieldMappingMapping for the fields of the table schema to --- End diff -- Backward compatibility. It could have been null in the past. ---
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543986#comment-16543986 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen Thanks! Looking forward~ > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen Thanks! Looking forward~ ---
[jira] [Assigned] (FLINK-9849) Upgrade hbase version to 2.0.1 for hbase connector
[ https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9849: --- Assignee: zhangminglei > Upgrade hbase version to 2.0.1 for hbase connector > -- > > Key: FLINK-9849 > URL: https://issues.apache.org/jira/browse/FLINK-9849 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > Currently hbase 1.4.3 is used for hbase connector. > We should upgrade to 2.0.1 which was recently released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9847) OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle unstable
[ https://issues.apache.org/jira/browse/FLINK-9847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543965#comment-16543965 ] Congxian Qiu commented on FLINK-9847: - Could this be {noformat} int BufferOrEvent#getChannelIndex(){noformat} return the value of {code:java} getChannelIndex.moreAvailable// moreAvailable is boolean{code} somehow? > OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle unstable > - > > Key: FLINK-9847 > URL: https://issues.apache.org/jira/browse/FLINK-9847 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.6.0 > > > The test > {{OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle}} is > unstable. When executing repeatedly the test fails from time to time with > {code} > java.lang.Exception: error in task > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle(OneInputStreamTaskTest.java:348) > at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:67) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: org.mockito.exceptions.misusing.WrongTypeOfReturnValue: > Boolean cannot be returned by getChannelIndex() > getChannelIndex() should return int > *** > If you're unsure why you're getting above error read on. > Due to the nature of the syntax above problem might occur because: > 1. This exception *might* occur in wrongly written multi-threaded tests. >Please refer to Mockito FAQ on limitations of concurrency testing. > 2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub > spies - >- with doReturn|Throw() family of methods. More in javadocs for > Mockito.spy() method. > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:165) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness$TaskThread.run(StreamTaskTestHarness.java:437) > {code} > Given the exception I suspect that there is a problem with mocking in the > {{OneInputStreamTaskTestHarness}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/6301 @jrthe42 Hi, thanks for your PR. From my side, I think use a connection pool to solve the connection problem is a better way. We don't need to keep the connections all the way. It wastes the connection resources if most threads have been idle for a long time. Also, the connection pool will not bring extra cost if threads are busy writing data into database, since the connections in the pool will be reused. I googled just now and find the `MiniConnectionPoolManager ` descriptions [here](http://www.source-code.biz/miniconnectionpoolmanager/). Maybe we can use it. Best, Hequn ---
[jira] [Commented] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization
[ https://issues.apache.org/jira/browse/FLINK-9794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543963#comment-16543963 ] ASF GitHub Bot commented on FLINK-9794: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/6301 @jrthe42 Hi, thanks for your PR. From my side, I think use a connection pool to solve the connection problem is a better way. We don't need to keep the connections all the way. It wastes the connection resources if most threads have been idle for a long time. Also, the connection pool will not bring extra cost if threads are busy writing data into database, since the connections in the pool will be reused. I googled just now and find the `MiniConnectionPoolManager ` descriptions [here](http://www.source-code.biz/miniconnectionpoolmanager/). Maybe we can use it. Best, Hequn > JDBCOutputFormat does not consider idle connection and multithreads > synchronization > --- > > Key: FLINK-9794 > URL: https://issues.apache.org/jira/browse/FLINK-9794 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0, 1.5.0 >Reporter: wangsan >Assignee: wangsan >Priority: Major > Labels: pull-request-available > > Current implementation of JDBCOutputFormat has two potential problems: > 1. The Connection was established when JDBCOutputFormat is opened, and will > be used all the time. But if this connection lies idle for a long time, the > database will force close the connection, thus errors may occur. > 2. The flush() method is called when batchCount exceeds the threshold, but it > is also called while snapshotting state. So two threads may modify upload and > batchCount, but without synchronization. > We need fix these two problems to make JDBCOutputFormat more reliable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9847) OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle unstable
[ https://issues.apache.org/jira/browse/FLINK-9847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543957#comment-16543957 ] Congxian Qiu commented on FLINK-9847: - I just gotten the following error message as I run the test on my laptop {code:java} java.lang.Exception: error in task at org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle(OneInputStreamTaskTest.java:348) at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:54) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: org.mockito.exceptions.misusing.WrongTypeOfReturnValue: Boolean cannot be returned by getChannelIndex() getChannelIndex() should return int *** If you're unsure why you're getting above error read on. Due to the nature of the syntax above problem might occur because: 1. This exception *might* occur in wrongly written multi-threaded tests. Please refer to Mockito FAQ on limitations of concurrency testing. 2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub spies - - with doReturn|Throw() family of methods. More in javadocs for Mockito.spy() method. at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:165) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness$TaskThread.run(StreamTaskTestHarness.java:437) {code} > OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle unstable > - > > Key: FLINK-9847 > URL: https://issues.apache.org/jira/browse/FLINK-9847 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.6.0 > > > The test > {{OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle}} is > unstable. When executing repeatedly the test fails from time to time with > {code} > java.lang.Exception: error in task > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle(OneInputStreamTaskTest.java:348) > at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at >
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543945#comment-16543945 ] ASF GitHub Bot commented on FLINK-9692: --- Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/6300 @StephanEwen I have run this on the following set up: ``` Number of shards on Kinesis stream: 384 Number of task slots: 384 / 192 / 96 Throughput achieved per shard (with adaptive reads) : 1.95 Mb/sec / 1.75 Mb/sec / 1.6 Mb/sec ``` > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/6300 @StephanEwen I have run this on the following set up: ``` Number of shards on Kinesis stream: 384 Number of task slots: 384 / 192 / 96 Throughput achieved per shard (with adaptive reads) : 1.95 Mb/sec / 1.75 Mb/sec / 1.6 Mb/sec ``` ---
[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA
[ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543770#comment-16543770 ] ASF GitHub Bot commented on FLINK-9575: --- Github user Wosin commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202480724 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID, removeJobFromStateBackend) --- End diff -- Technically we can, but this changes the return type of the future as `cleanupJob` does indeed return something. > Potential race condition when removing JobGraph in HA > - > > Key: FLINK-9575 > URL: https://issues.apache.org/jira/browse/FLINK-9575 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When we are removing the _JobGraph_ from _JobManager_ for example after > invoking _cancel()_, the following code is executed : > {noformat} > > val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val > result = if (removeJobFromStateBackend) { val futureOption = Some(future { > try { // ...otherwise, we can have lingering resources when there is a > concurrent shutdown // and the ZooKeeper client is closed. Not removing the > job immediately allow the // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => > log.warn(s"Could not remove submitted job graph $jobID.", t) } > }(context.dispatcher)) try { archive ! decorateMessage( > ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch > { case t: Throwable => log.warn(s"Could not archive the execution graph > $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result > case None => None } // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, > removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) > futureOption } > val futureOption = currentJobs.get(jobID) match { > case Some((eg, _)) => > val result = if (removeJobFromStateBackend) { > val futureOption = Some(future { > try { > // ...otherwise, we can have lingering resources when there is a concurrent > shutdown > // and the ZooKeeper client is closed. Not removing the job immediately allow > the > // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) > } catch { > case t: Throwable => log.warn(s"Could not remove submitted job graph > $jobID.", t) > } > }(context.dispatcher)) > try { > archive ! decorateMessage( > ArchiveExecutionGraph( > jobID, > ArchivedExecutionGraph.createFrom(eg))) > } catch { > case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", > t) > } > futureOption > } else { > None > } > currentJobs.remove(jobID) > result > case None => None > } > // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) > blobServer.cleanupJob(jobID, removeJobFromStateBackend) > jobManagerMetricGroup.removeJob(jobID) > futureOption > }{noformat} > This causes the asynchronous removal of the job and synchronous removal of > blob files connected with this jar. This means as far as I understand that > there is a potential problem that we can fail to remove job graph from > _submittedJobGraphs._ If the JobManager fails and we elect the new leader it > can try to recover such job, but it will fail with an exception since the > assigned blob was already removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...
Github user Wosin commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202480724 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID, removeJobFromStateBackend) --- End diff -- Technically we can, but this changes the return type of the future as `cleanupJob` does indeed return something. ---
[jira] [Commented] (FLINK-9848) When Parallelism more than available task slots Flink stays idle
[ https://issues.apache.org/jira/browse/FLINK-9848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543681#comment-16543681 ] Yazdan Shirvany commented on FLINK-9848: Actually I re-tried with multiple examples and seems only {{batch/WordCount.jar}} can be successful when {{parallelism > available slots}} > When Parallelism more than available task slots Flink stays idle > > > Key: FLINK-9848 > URL: https://issues.apache.org/jira/browse/FLINK-9848 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0, 1.5.1 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Critical > > For version 1.4.x when select Parallelism > Available task Slots, Flink throw > bellow error right away > {{NoResourceAvailableException: Not enough free slots available to run the > job}} > > but for version 1.5.x there are two different behaviors: Sometimes job ran > successfully and sometimes it throw bellow error after 5 minutes > > {{org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate all requires slots within timeout of 30 ms. Slots > required: 5, slots allocated: 2}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543538#comment-16543538 ] ASF GitHub Bot commented on FLINK-8858: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/6332 One more thing. Do we need this `-u` flag? Shouldn't it be enough to support something like: `flink-cli < query01.sql` or `echo "SELECT * FROM foo" | flink-cli` > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6332: [FLINK-8858] [sql-client] Add support for INSERT INTO in ...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/6332 One more thing. Do we need this `-u` flag? Shouldn't it be enough to support something like: `flink-cli < query01.sql` or `echo "SELECT * FROM foo" | flink-cli` ---
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543535#comment-16543535 ] ASF GitHub Bot commented on FLINK-8858: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202421784 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -381,6 +433,23 @@ private Table createTable(TableEnvironment tableEnv, String query) { } } + /** +* Applies the given update statement to the given table environment with query configuration. +* +* @param tableEnv table environment +* @param queryConfig query configuration +* @param statement SQL update statement (e.g. INSERT INTO) +*/ + private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String statement) { --- End diff -- rename `statement ` -> `updateStatement` and drop `@param` section from java doc? > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202398523 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor executor) { terminal = TerminalBuilder.builder() .name(CliStrings.CLI_NAME) .build(); + // make space from previous output and test the writer + terminal.writer().println(); --- End diff -- why is that needed? Shouldn't we print new line at then of the output instead? ---
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202421703 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -371,6 +416,13 @@ public void stop(SessionContext session) { result.isMaterialized()); } + /** +* Creates a table using the given query in the given table environment. +* +* @param tableEnv table environment +* @param query SQL SELECT query +* @return result table object +*/ private Table createTable(TableEnvironment tableEnv, String query) { --- End diff -- rename `query` -> `selectQuery` and drop `@param` section from java doc? ---
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543528#comment-16543528 ] ASF GitHub Bot commented on FLINK-8858: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202419608 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -173,55 +180,92 @@ public void open() { if (line == null || line.equals("")) { continue; } + parseAndCall(line); + } + } - final SqlCommandCall cmdCall = SqlCommandParser.parse(line); + /** +* Submits a SQL update statement and prints status information and/or errors on the terminal. +* +* @param statement SQL update statement +* @return flag to indicate if the submission was successful or not +*/ + public boolean submitUpdate(String statement) { --- End diff -- do we have tests for that? Some `ITCase`? > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543531#comment-16543531 ] ASF GitHub Bot commented on FLINK-8858: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202421703 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -371,6 +416,13 @@ public void stop(SessionContext session) { result.isMaterialized()); } + /** +* Creates a table using the given query in the given table environment. +* +* @param tableEnv table environment +* @param query SQL SELECT query +* @return result table object +*/ private Table createTable(TableEnvironment tableEnv, String query) { --- End diff -- rename `query` -> `selectQuery` and drop `@param` section from java doc? > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202421784 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -381,6 +433,23 @@ private Table createTable(TableEnvironment tableEnv, String query) { } } + /** +* Applies the given update statement to the given table environment with query configuration. +* +* @param tableEnv table environment +* @param queryConfig query configuration +* @param statement SQL update statement (e.g. INSERT INTO) +*/ + private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String statement) { --- End diff -- rename `statement ` -> `updateStatement` and drop `@param` section from java doc? ---
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202421005 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -329,14 +341,46 @@ public void stop(SessionContext session) { } } - private ResultDescriptor executeQueryInternal(ExecutionContext context, String query) { + private ProgramTargetDescriptor executeUpdateInternal(ExecutionContext context, String statement) { + final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); + + // apply update statement --- End diff -- some of those comments are a little bit unnecessary? ---
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543533#comment-16543533 ] ASF GitHub Bot commented on FLINK-8858: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202420258 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) { } } + private boolean callInsertInto(SqlCommandCall cmdCall) { + terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi()); + terminal.flush(); + + try { + final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]); + terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi()); --- End diff -- ``` programTarget.writeTo(terminal.writer()) ``` ? It would be easier to add more fields in the future and more difficult to forget about printing them. > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543527#comment-16543527 ] ASF GitHub Bot commented on FLINK-8858: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202407072 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java --- @@ -97,14 +97,34 @@ private void start() { // add shutdown hook Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(context, executor)); - // start CLI - final CliClient cli = new CliClient(context, executor); - cli.open(); + // do the actual work + openCli(context, executor); } else { throw new SqlClientException("Gateway mode is not supported yet."); } } + /** +* Opens the CLI client for executing SQL statements. +* +* @param context session context +* @param executor executor +*/ + private void openCli(SessionContext context, Executor executor) { + final CliClient cli = new CliClient(context, executor); + // interactive CLI mode + if (options.getUpdateStatement() == null) { + cli.open(); + } + // execute single update statement + else { + final boolean success = cli.submitUpdate(options.getUpdateStatement()); --- End diff -- we do not wait for the query to complete? > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202407651 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -173,55 +180,92 @@ public void open() { if (line == null || line.equals("")) { continue; } + parseAndCall(line); + } + } - final SqlCommandCall cmdCall = SqlCommandParser.parse(line); + /** +* Submits a SQL update statement and prints status information and/or errors on the terminal. +* +* @param statement SQL update statement +* @return flag to indicate if the submission was successful or not +*/ + public boolean submitUpdate(String statement) { + terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi()); + terminal.writer().println(new AttributedString(statement).toString()); + terminal.flush(); - if (cmdCall == null) { - terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL)); - continue; - } + final Optional parsedStatement = SqlCommandParser.parse(statement); --- End diff -- deduplicate parsing call and `if` check - there is already a bug here, either missing `flush` or unnecessary `flush` ---