[jira] [Closed] (FLINK-9818) Add cluster component command line parser

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9818.

Resolution: Fixed

Fixed via ab9bd87e521d19db7c7d783268a3532d2e876a5d

> Add cluster component command line parser
> -
>
> Key: FLINK-9818
> URL: https://issues.apache.org/jira/browse/FLINK-9818
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In order to parse command line options for the cluster components 
> ({{TaskManagerRunner}}, {{ClusterEntrypoints}}), we should add a 
> {{CommandLineParser}} which supports the common command line options 
> ({{--configDir}}, {{--webui-port}} and dynamic properties which can override 
> configuration values).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7087) Implement Flip-6 container entry point

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7087:


Assignee: Till Rohrmann

> Implement Flip-6 container entry point
> --
>
> Key: FLINK-7087
> URL: https://issues.apache.org/jira/browse/FLINK-7087
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.6.0
>
>
> In order to support Docker and K8 we have to provide a container entry point. 
> In a first version, the container entry point could be similar to the 
> standalone session mode just with the difference that we don't submit a job 
> to the cluster. The job has to be contained in the container image or being 
> at least retrievable by the entry point. In that sense the container entry 
> point acts like a per-job standalone mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7087) Implement Flip-6 container entry point

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7087.

Resolution: Fixed

Fixed via 8f467c1e9727d5a86d38d0b49753c534a1a161da

> Implement Flip-6 container entry point
> --
>
> Key: FLINK-7087
> URL: https://issues.apache.org/jira/browse/FLINK-7087
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.6.0
>
>
> In order to support Docker and K8 we have to provide a container entry point. 
> In a first version, the container entry point could be similar to the 
> standalone session mode just with the difference that we don't submit a job 
> to the cluster. The job has to be contained in the container image or being 
> at least retrievable by the entry point. In that sense the container entry 
> point acts like a per-job standalone mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9488) Create common entry point for master and workers

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9488.

Resolution: Fixed

Fixed via 8f467c1e9727d5a86d38d0b49753c534a1a161da

> Create common entry point for master and workers
> 
>
> Key: FLINK-9488
> URL: https://issues.apache.org/jira/browse/FLINK-9488
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> To make the container setup easier, we should provide a single cluster entry 
> point which uses leader election to become either the master or a worker 
> which runs the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9821) Let dynamic properties overwrite configuration settings in TaskManagerRunner

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9821.

Resolution: Fixed

Fixed via 740f2fbf2e65fa988c6a577989ccd8923729be45

> Let dynamic properties overwrite configuration settings in TaskManagerRunner
> 
>
> Key: FLINK-9821
> URL: https://issues.apache.org/jira/browse/FLINK-9821
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-9820 we should also allow dynamic properties to overwrite 
> configuration values in the {{TaskManagerRunner}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9819) Create start up scripts for the StandaloneJobClusterEntryPoint

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9819.

Resolution: Fixed

Fixed via 5a4bdf2c9fd1693ad3b90dbbd3bcb589ed15c101

> Create start up scripts for the StandaloneJobClusterEntryPoint
> --
>
> Key: FLINK-9819
> URL: https://issues.apache.org/jira/browse/FLINK-9819
> Project: Flink
>  Issue Type: New Feature
>  Components: Startup Shell Scripts
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In order to start the {{StandaloneJobClusterEntryPoint}} we need start up 
> scripts in {{flink-dist}}. We should extend the {{flink-daemon.sh}} and the 
> {{flink-console.sh}} scripts to support this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9820) Let dynamic properties overwrite configuration settings in ClusterEntrypoint

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9820.

Resolution: Fixed

Fixed via 2fbbf8ee662647c71581f5cd989226be820fed0f

> Let dynamic properties overwrite configuration settings in ClusterEntrypoint
> 
>
> Key: FLINK-9820
> URL: https://issues.apache.org/jira/browse/FLINK-9820
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The dynamic properties which are passed to the {{ClusterEntrypoint}} should 
> overwrite values in the loaded {{Configuration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9822.

Resolution: Fixed

Fixed via 56e5381cb7aba01f1d7ecfa11e4be7f505a35baf

> Add Dockerfile for StandaloneJobClusterEntryPoint image
> ---
>
> Key: FLINK-9822
> URL: https://issues.apache.org/jira/browse/FLINK-9822
> Project: Flink
>  Issue Type: New Feature
>  Components: Docker
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Add a {{Dockerfile}} to create an image which contains the 
> {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The 
> entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} 
> with the added user code jar. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9488) Create common entry point for master and workers

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544050#comment-16544050
 ] 

ASF GitHub Bot commented on FLINK-9488:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6315


> Create common entry point for master and workers
> 
>
> Key: FLINK-9488
> URL: https://issues.apache.org/jira/browse/FLINK-9488
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> To make the container setup easier, we should provide a single cluster entry 
> point which uses leader election to become either the master or a worker 
> which runs the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9619.
--
   Resolution: Fixed
Fix Version/s: (was: 1.5.2)

Fixed via c9ad0a07ef0339ced74057fc17800ca9ab7784c1

> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544053#comment-16544053
 ] 

ASF GitHub Bot commented on FLINK-9619:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6185


> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6317: [FLINK-9820] Forward dynamic properties to Flink c...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6317


---


[jira] [Commented] (FLINK-9821) Let dynamic properties overwrite configuration settings in TaskManagerRunner

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544052#comment-16544052
 ] 

ASF GitHub Bot commented on FLINK-9821:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6318


> Let dynamic properties overwrite configuration settings in TaskManagerRunner
> 
>
> Key: FLINK-9821
> URL: https://issues.apache.org/jira/browse/FLINK-9821
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-9820 we should also allow dynamic properties to overwrite 
> configuration values in the {{TaskManagerRunner}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6331: [FLINK-9701] [state] (follow up) Use StateTtlConfi...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6331


---


[GitHub] flink pull request #6318: [FLINK-9821] Forward dynamic properties to configu...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6318


---


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6319


---


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544056#comment-16544056
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6283


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9823) Add Kubernetes deployment files for standalone job cluster

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9823.

Resolution: Fixed

Fixed via 387a3bc198cfea016abd92953c7fce28e641cf67

> Add Kubernetes deployment files for standalone job cluster
> --
>
> Key: FLINK-9823
> URL: https://issues.apache.org/jira/browse/FLINK-9823
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-9822, it would be helpful for the user to have example 
> Kubernetes deployment files to start a standalone job cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6314: [FLINK-9818] Add cluster component command line pa...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6314


---


[jira] [Commented] (FLINK-9818) Add cluster component command line parser

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544048#comment-16544048
 ] 

ASF GitHub Bot commented on FLINK-9818:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6314


> Add cluster component command line parser
> -
>
> Key: FLINK-9818
> URL: https://issues.apache.org/jira/browse/FLINK-9818
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In order to parse command line options for the cluster components 
> ({{TaskManagerRunner}}, {{ClusterEntrypoints}}), we should add a 
> {{CommandLineParser}} which supports the common command line options 
> ({{--configDir}}, {{--webui-port}} and dynamic properties which can override 
> configuration values).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6320: [FLINK-9823] Add Kubernetes deployment ymls

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6320


---


[GitHub] flink pull request #6316: [FLINK-9819] Add startup scripts for standalone jo...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6316


---


[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544055#comment-16544055
 ] 

ASF GitHub Bot commented on FLINK-9822:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6319


> Add Dockerfile for StandaloneJobClusterEntryPoint image
> ---
>
> Key: FLINK-9822
> URL: https://issues.apache.org/jira/browse/FLINK-9822
> Project: Flink
>  Issue Type: New Feature
>  Components: Docker
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Add a {{Dockerfile}} to create an image which contains the 
> {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The 
> entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} 
> with the added user code jar. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-07-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9619:
--
Labels: pull-request-available  (was: )

> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6283


---


[jira] [Commented] (FLINK-9820) Let dynamic properties overwrite configuration settings in ClusterEntrypoint

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544051#comment-16544051
 ] 

ASF GitHub Bot commented on FLINK-9820:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6317


> Let dynamic properties overwrite configuration settings in ClusterEntrypoint
> 
>
> Key: FLINK-9820
> URL: https://issues.apache.org/jira/browse/FLINK-9820
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The dynamic properties which are passed to the {{ClusterEntrypoint}} should 
> overwrite values in the loaded {{Configuration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9823) Add Kubernetes deployment files for standalone job cluster

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544046#comment-16544046
 ] 

ASF GitHub Bot commented on FLINK-9823:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6320


> Add Kubernetes deployment files for standalone job cluster
> --
>
> Key: FLINK-9823
> URL: https://issues.apache.org/jira/browse/FLINK-9823
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-9822, it would be helpful for the user to have example 
> Kubernetes deployment files to start a standalone job cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6185: [FLINK-9619][YARN] Eagerly close the connection wi...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6185


---


[GitHub] flink pull request #6315: [FLINK-9488] Add container entry point StandaloneJ...

2018-07-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6315


---


[jira] [Commented] (FLINK-9819) Create start up scripts for the StandaloneJobClusterEntryPoint

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544049#comment-16544049
 ] 

ASF GitHub Bot commented on FLINK-9819:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6316


> Create start up scripts for the StandaloneJobClusterEntryPoint
> --
>
> Key: FLINK-9819
> URL: https://issues.apache.org/jira/browse/FLINK-9819
> Project: Flink
>  Issue Type: New Feature
>  Components: Startup Shell Scripts
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> In order to start the {{StandaloneJobClusterEntryPoint}} we need start up 
> scripts in {{flink-dist}}. We should extend the {{flink-daemon.sh}} and the 
> {{flink-console.sh}} scripts to support this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544054#comment-16544054
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6331


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9701) Activate TTL in state descriptors

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9701.
--
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed via 1632681e41cbc1092a6b4d47a58adfffba6af5d4

> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9143.
--
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed via 57872d53c4584faace6dc8e4038ad1f2d068a453

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9143:


Assignee: Dawid Wysakowicz  (was: yuqi)

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9701) Activate TTL in state descriptors

2018-07-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-9701:
--

> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544040#comment-16544040
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202507057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan(
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
 val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
-  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
-case Some(_: StreamTableSourceTable[_]) => true
-case Some(_: BatchTableSourceTable[_]) => false
-case _ => throw TableException(s"Unknown Table type 
${t.getClass}.")
-  }
-  case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+  case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true
+  // null
--- End diff --

This information is useful.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9762:
---

Assignee: (was: vinoyang)

> CoreOptions.TMP_DIRS wrongly managed on Yarn
> 
>
> Key: FLINK-9762
> URL: https://issues.apache.org/jira/browse/FLINK-9762
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
> JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.
> The issue is that CoreOptions.TMP_DIRS is configured to the default value 
> during JobManager initialization and added to the configuration object. When 
> TaskManager is launched the appropriate configuration object is cloned with 
> LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
> container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
> always equal either to path in flink.yml or to the or to the LOCAL_DIRS of 
> Job Manager (default behaviour). Is TaskManager’s container do not have an 
> access to another folders, that folders allocated by YARN TaskManager cannot 
> be started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202507057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan(
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
 val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
-  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
-case Some(_: StreamTableSourceTable[_]) => true
-case Some(_: BatchTableSourceTable[_]) => false
-case _ => throw TableException(s"Unknown Table type 
${t.getClass}.")
-  }
-  case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+  case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true
+  // null
--- End diff --

This information is useful.


---


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506906
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],
+  propertyMap: JMap[String, String],
+  classLoader: ClassLoader)
+: T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544038#comment-16544038
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506906
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],

[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544037#comment-16544037
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506811
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
--- End diff --

the class name use singular looks better to me


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544033#comment-16544033
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
--- End diff --

the description of the param and  throws do not need linefeed


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544031#comment-16544031
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505334
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
+* @return The SSLContext object which can be used by the ssl transport 
server
+* Returns null if SSL is disabled
+* @throws Exception
+* Thrown if there is any misconfiguration
+*/
+   @Nullable
+   public static SSLContext createSSLServerContext(Configuration 
sslConfig) throws Exception {
+
--- End diff --

this empty line is useless, can be removed


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544030#comment-16544030
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505321
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 ---
@@ -160,6 +160,7 @@ private Configuration createSslConfig() throws 
Exception {
flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, 
"password");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
+// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL");
--- End diff --

if this is a useless dead code, can be removed


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544036#comment-16544036
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
--- End diff --

provide a constructor like `SSLProvider(String provider)` to give the 
enum's string representation looks better than hard code.


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544034#comment-16544034
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506769
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(

[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544032#comment-16544032
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(

[jira] [Commented] (FLINK-9816) Support Netty SslEngine based on openSSL

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544035#comment-16544035
 ] 

ASF GitHub Bot commented on FLINK-9816:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
--- End diff --

mark these fields as `private` as provide `getter/setter` looks better to me


> Support Netty SslEngine based on openSSL
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506769
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
 
-   clientSSLContext = 
SSLContext.getInstance(sslProtocolVersion);
-   

[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
--- End diff --

mark these fields as `private` as provide `getter/setter` looks better to me


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505334
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
+* @return The SSLContext object which can be used by the ssl transport 
server
+* Returns null if SSL is disabled
+* @throws Exception
+* Thrown if there is any misconfiguration
+*/
+   @Nullable
+   public static SSLContext createSSLServerContext(Configuration 
sslConfig) throws Exception {
+
--- End diff --

this empty line is useless, can be removed


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
--- End diff --

provide a constructor like `SSLProvider(String provider)` to give the 
enum's string representation looks better than hard code.


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506811
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
--- End diff --

the class name use singular looks better to me


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -163,80 +163,157 @@ public static void 
setSSLVerifyHostname(Configuration sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* SSL engine provider.
+*/
+   public enum SSLProvider {
+   JDK,
+   /**
+* OpenSSL with fallback to JDK if not available.
+*/
+   OPENSSL;
+
+   public static SSLProvider fromString(String value) {
+   Preconditions.checkNotNull(value);
+   if (value.equalsIgnoreCase("OPENSSL")) {
+   return OPENSSL;
+   } else if (value.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + value);
+   }
+   }
+   }
+
+   /**
+* Instances needed to set up an SSL client connection.
+*/
+   public static class SSLClientTools {
+   public final SSLProvider preferredSslProvider;
+   public final String sslProtocolVersion;
+   public final TrustManagerFactory trustManagerFactory;
+
+   public SSLClientTools(
+   SSLProvider preferredSslProvider,
+   String sslProtocolVersion,
+   TrustManagerFactory trustManagerFactory) {
+   this.preferredSslProvider = preferredSslProvider;
+   this.sslProtocolVersion = sslProtocolVersion;
+   this.trustManagerFactory = trustManagerFactory;
+   }
+   }
+
+   /**
+* Creates necessary helper objects to use for creating an SSL Context 
for the client if SSL is
+* configured.
 *
 * @param sslConfig
 *The application configuration
-* @return The SSLContext object which can be used by the ssl transport 
client
-* Returns null if SSL is disabled
+* @return The SSLClientTools object which can be used for creating 
some SSL context object;
+* returns null if SSL is disabled.
 * @throws Exception
 * Thrown if there is any misconfiguration
 */
@Nullable
-   public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
-
+   public static SSLClientTools createSSLClientTools(Configuration 
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
-   SSLContext clientSSLContext = null;
 
if (getSSLEnabled(sslConfig)) {
LOG.debug("Creating client SSL context from 
configuration");
 
String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+   SSLProvider sslProvider = 
SSLProvider.fromString(sslConfig.getString(SecurityOptions.SSL_PROVIDER));
 
Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-   FileInputStream trustStoreFile = null;
-   try {
-   trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+   try (FileInputStream trustStoreFile = new 
FileInputStream(new File(trustStoreFilePath))) {
trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-   } finally {
-   if (trustStoreFile != null) {
-   trustStoreFile.close();
-   }
}
 
TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
 
-   clientSSLContext = 
SSLContext.getInstance(sslProtocolVersion);
-   

[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202506694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -249,14 +326,40 @@ public static SSLContext 
createSSLServerContext(Configuration sslConfig) throws
 
// Set up key manager factory to use the server key 
store
KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-   
KeyManagerFactory.getDefaultAlgorithm());
+   KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, certPassword.toCharArray());
 
+   return new SSLServerTools(sslProvider, 
sslProtocolVersion, sslCipherSuites, kmf);
+   }
+
+   return null;
+   }
+
+   /**
+* Creates the SSL Context for the server if SSL is configured.
+*
+* @param sslConfig
+*The application configuration
--- End diff --

the description of the param and  throws do not need linefeed


---


[GitHub] flink pull request #6328: [FLINK-9816][network] add option to configure SSL ...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6328#discussion_r202505321
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 ---
@@ -160,6 +160,7 @@ private Configuration createSslConfig() throws 
Exception {
flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, 
"password");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
+// flinkConfig.setString(SecurityOptions.SSL_PROVIDER, "OPENSSL");
--- End diff --

if this is a useless dead code, can be removed


---


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506861
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],
+  propertyMap: JMap[String, String],
+  classLoader: ClassLoader)
+: T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544028#comment-16544028
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506861
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544024#comment-16544024
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: 
ClassLoader)
-  : TableFactory = {
+  /**
+* Finds a table factory of the given class, descriptor, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: 
ClassLoader): T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(descriptor)
+Preconditions.checkNotNull(classLoader)
 
-val properties = new DescriptorProperties()
-descriptor.addProperties(properties)
-find(clz, properties.asMap.asScala.toMap, classLoader)
+val descriptorProperties = new DescriptorProperties()
+descriptor.addProperties(descriptorProperties)
+findInternal(factoryClass, descriptorProperties.asMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String]): TableFactory = 
{
-find(clz: Class[_], properties, null)
+  /**
+* Finds a table factory of the given class and property map.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+
+findInternal(factoryClass, propertyMap, None)
   }
 
-  def find(clz: Class[_], properties: Map[String, String],
-   classLoader: ClassLoader): TableFactory = {
+  /**
+* Finds a table factory of the given class, property map, and 
classloader.
+*
+* @param factoryClass desired factory class
+* @param propertyMap properties that describe the factory configuration
+* @param classLoader classloader for service loading
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](
+  factoryClass: Class[T],
+  propertyMap: JMap[String, String],
+  classLoader: ClassLoader)
+: T = {
+Preconditions.checkNotNull(factoryClass)
+Preconditions.checkNotNull(propertyMap)
+

[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544023#comment-16544023
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506740
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
--- End diff --

The variable is only passed one time. The internal method is not checking 
for null gain.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506740
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 ---
@@ -18,143 +18,358 @@
 
 package org.apache.flink.table.factories
 
-import java.util.{ServiceConfigurationError, ServiceLoader}
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable
 
 /**
-  * Unified interface to search for TableFactoryDiscoverable of provided 
type and properties.
+  * Unified interface to search for a [[TableFactory]] of provided type 
and properties.
   */
 object TableFactoryService extends Logging {
 
   private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFactory])
 
-  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
-find(clz, descriptor, null)
+  /**
+* Finds a table factory of the given class and descriptor.
+*
+* @param factoryClass desired factory class
+* @param descriptor descriptor describing the factory configuration
+* @tparam T factory class type
+* @return the matching factory
+*/
+  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+Preconditions.checkNotNull(factoryClass)
--- End diff --

The variable is only passed one time. The internal method is not checking 
for null gain.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544019#comment-16544019
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506661
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -143,118 +143,82 @@ case class CatalogAlreadyExistException(
 }
 
 /**
-  * Exception for not finding a [[TableFormatFactory]] for the
-  * given properties.
+  * Exception for not finding a [[TableFactory]] for the given properties.
   *
   * @param message message that indicates the current matching step
   * @param factoryClass required factory class
-  * @param formatFactories all found factories
-  * @param properties properties that describe the table format
+  * @param factories all found factories
+  * @param properties properties that describe the configuration
   * @param cause the cause
   */
-case class NoMatchingTableFormatException(
+case class NoMatchingTableFactoryException(
   message: String,
   factoryClass: Class[_],
-  formatFactories: Seq[TableFormatFactory[_]],
+  factories: Seq[TableFactory],
   properties: Map[String, String],
   cause: Throwable)
 extends RuntimeException(
--- End diff --

So far we don't have an inheritance of exceptions. Case classes don't 
support that in Scala so we would need to convert them. 


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506661
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -143,118 +143,82 @@ case class CatalogAlreadyExistException(
 }
 
 /**
-  * Exception for not finding a [[TableFormatFactory]] for the
-  * given properties.
+  * Exception for not finding a [[TableFactory]] for the given properties.
   *
   * @param message message that indicates the current matching step
   * @param factoryClass required factory class
-  * @param formatFactories all found factories
-  * @param properties properties that describe the table format
+  * @param factories all found factories
+  * @param properties properties that describe the configuration
   * @param cause the cause
   */
-case class NoMatchingTableFormatException(
+case class NoMatchingTableFactoryException(
   message: String,
   factoryClass: Class[_],
-  formatFactories: Seq[TableFormatFactory[_]],
+  factories: Seq[TableFactory],
   properties: Map[String, String],
   cause: Throwable)
 extends RuntimeException(
--- End diff --

So far we don't have an inheritance of exceptions. Case classes don't 
support that in Scala so we would need to convert them. 


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544018#comment-16544018
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506625
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
 ---
@@ -16,42 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.formats
+package org.apache.flink.table.factories
 
 import java.util
 
-import org.apache.flink.api.common.serialization.{DeserializationSchema, 
SerializationSchema}
-
 /**
-  * A factory to create different table format instances. This factory is 
used with Java's Service
-  * Provider Interfaces (SPI) for discovering. A factory is called with a 
set of normalized
-  * properties that describe the desired format. The factory allows for 
matching to the given set of
-  * properties. See also [[SerializationSchemaFactory]] and 
[[DeserializationSchemaFactory]] for
-  * creating configured instances of format classes accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' 
file of a JAR file in
-  * the current classpath to be found.
+  * A factory to create configured table format instances based on 
string-based properties. See
+  * also [[TableFactory]] for more information.
   *
   * @tparam T record type that the format produces or consumes
   */
-trait TableFormatFactory[T] {
-
-  /**
-* Specifies the context that this factory has been implemented for. 
The framework guarantees
-* to only use the factory if the specified set of properties and 
values are met.
-*
-* Typical properties might be:
-*   - format.type
-*   - format.version
-*
-* Specified property versions allow the framework to provide backwards 
compatible properties
-* in case of string format changes:
-*   - format.property-version
-*
-* An empty context means that the factory matches for all requests.
-*/
-  def requiredContext(): util.Map[String, String]
+trait TableFormatFactory[T] extends TableFactory {
--- End diff --

Because the Java comment explains the specific situation how supported 
format properties are handled.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506625
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
 ---
@@ -16,42 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.formats
+package org.apache.flink.table.factories
 
 import java.util
 
-import org.apache.flink.api.common.serialization.{DeserializationSchema, 
SerializationSchema}
-
 /**
-  * A factory to create different table format instances. This factory is 
used with Java's Service
-  * Provider Interfaces (SPI) for discovering. A factory is called with a 
set of normalized
-  * properties that describe the desired format. The factory allows for 
matching to the given set of
-  * properties. See also [[SerializationSchemaFactory]] and 
[[DeserializationSchemaFactory]] for
-  * creating configured instances of format classes accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' 
file of a JAR file in
-  * the current classpath to be found.
+  * A factory to create configured table format instances based on 
string-based properties. See
+  * also [[TableFactory]] for more information.
   *
   * @tparam T record type that the format produces or consumes
   */
-trait TableFormatFactory[T] {
-
-  /**
-* Specifies the context that this factory has been implemented for. 
The framework guarantees
-* to only use the factory if the specified set of properties and 
values are met.
-*
-* Typical properties might be:
-*   - format.type
-*   - format.version
-*
-* Specified property versions allow the framework to provide backwards 
compatible properties
-* in case of string format changes:
-*   - format.property-version
-*
-* An empty context means that the factory matches for all requests.
-*/
-  def requiredContext(): util.Map[String, String]
+trait TableFormatFactory[T] extends TableFactory {
--- End diff --

Because the Java comment explains the specific situation how supported 
format properties are handled.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544017#comment-16544017
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
 val properties = new DescriptorProperties()
 externalCatalogTable.addProperties(properties)
 val javaMap = properties.asMap
-val source = TableFactoryService.find(classOf[TableSourceFactory[_]], 
javaMap)
-  .asInstanceOf[TableSourceFactory[_]]
-  .createTableSource(javaMap)
 tableEnv match {
   // check for a batch table source in this batch environment
   case _: BatchTableEnvironment =>
-source match {
-  case bts: BatchTableSource[_] =>
-new TableSourceSinkTable(Some(new BatchTableSourceTable(
-  bts,
-  new FlinkStatistic(externalCatalogTable.getTableStats))), 
None)
-  case _ => throw new TableException(
-s"Found table source '${source.getClass.getCanonicalName}' is 
not applicable " +
-  s"in a batch environment.")
-}
+val source = TableFactoryService
--- End diff --

Usually it is very uncommon to define both a batch and streaming source in 
the same factory. Your proposed change would require all future sources to 
implement a check for the environment before which is unnecessary in 80% of the 
cases. Separating by environment is a concept that can be find throughout the 
entire `flink-table` module because both sources/sinks behave quite different.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
 val properties = new DescriptorProperties()
 externalCatalogTable.addProperties(properties)
 val javaMap = properties.asMap
-val source = TableFactoryService.find(classOf[TableSourceFactory[_]], 
javaMap)
-  .asInstanceOf[TableSourceFactory[_]]
-  .createTableSource(javaMap)
 tableEnv match {
   // check for a batch table source in this batch environment
   case _: BatchTableEnvironment =>
-source match {
-  case bts: BatchTableSource[_] =>
-new TableSourceSinkTable(Some(new BatchTableSourceTable(
-  bts,
-  new FlinkStatistic(externalCatalogTable.getTableStats))), 
None)
-  case _ => throw new TableException(
-s"Found table source '${source.getClass.getCanonicalName}' is 
not applicable " +
-  s"in a batch environment.")
-}
+val source = TableFactoryService
--- End diff --

Usually it is very uncommon to define both a batch and streaming source in 
the same factory. Your proposed change would require all future sources to 
implement a check for the environment before which is unnecessary in 80% of the 
cases. Separating by environment is a concept that can be find throughout the 
entire `flink-table` module because both sources/sinks behave quite different.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544016#comment-16544016
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506512
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
 ---
@@ -21,8 +21,12 @@
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
 
 /**
- * Tests for {@link Kafka08JsonTableSourceFactory}.
+ * Tests for legacy Kafka08JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be 
removed once we
+ * drop support for format-specific table sources.
  */
+@Deprecated
--- End diff --

No but we forgot it in the previous commit.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506512
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
 ---
@@ -21,8 +21,12 @@
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
 
 /**
- * Tests for {@link Kafka08JsonTableSourceFactory}.
+ * Tests for legacy Kafka08JsonTableSourceFactory.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be 
removed once we
+ * drop support for format-specific table sources.
  */
+@Deprecated
--- End diff --

No but we forgot it in the previous commit.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544015#comment-16544015
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506494
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
 ---
@@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties 
properties) {
}
 
public Source toSource() {
-   final Map newProperties = new 
HashMap<>(properties);
-   newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
-   
TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
--- End diff --

The table type is a concept of the SQL Client and should not be part of the 
table descriptor.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506494
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
 ---
@@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties 
properties) {
}
 
public Source toSource() {
-   final Map newProperties = new 
HashMap<>(properties);
-   newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
-   
TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
--- End diff --

The table type is a concept of the SQL Client and should not be part of the 
table descriptor.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544007#comment-16544007
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
 ---
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.connectors
+package org.apache.flink.table.factories
 
 import java.util
 
 /**
   * Common trait for all properties-based discoverable table factories.
   */
-trait DiscoverableTableFactory {
+trait TableFactory {
--- End diff --

Actually, I would like the very generic name `Factory` but since we have to 
add some prefix to make it unique in the project, I named it `TableFactory` 
because we prefix everything in this module with `Table`.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
 ---
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.connectors
+package org.apache.flink.table.factories
 
 import java.util
 
 /**
   * Common trait for all properties-based discoverable table factories.
   */
-trait DiscoverableTableFactory {
+trait TableFactory {
--- End diff --

Actually, I would like the very generic name `Factory` but since we have to 
add some prefix to make it unique in the project, I named it `TableFactory` 
because we prefix everything in this module with `Table`.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544004#comment-16544004
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506126
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -54,51 +56,105 @@
  * override {@link #createKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
  */
 @Internal
-public abstract class KafkaTableSource
-   implements StreamTableSource, DefinedProctimeAttribute, 
DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+   StreamTableSource,
+   DefinedProctimeAttribute,
+   DefinedRowtimeAttributes,
+   DefinedFieldMapping {
+
+   // common table source attributes
+   // TODO make all attributes final once we drop support for 
format-specific table sources
 
/** The schema of the table. */
private final TableSchema schema;
 
+   /** Field name of the processing time attribute, null if no processing 
time field is defined. */
+   private String proctimeAttribute;
+
+   /** Descriptor for a rowtime attribute. */
+   private List rowtimeAttributeDescriptors;
+
+   /** Mapping for the fields of the table schema to fields of the 
physical returned type or null. */
+   private Map fieldMapping;
+
+   // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
 
/** Properties for the Kafka consumer. */
private final Properties properties;
 
-   /** Type information describing the result type. */
-   private TypeInformation returnType;
-
-   /** Field name of the processing time attribute, null if no processing 
time field is defined. */
-   private String proctimeAttribute;
-
-   /** Descriptor for a rowtime attribute. */
-   private List rowtimeAttributeDescriptors;
+   /** Deserialization schema for decoding records from Kafka. */
+   private final DeserializationSchema deserializationSchema;
 
/** The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
-   private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+   private StartupMode startupMode;
 
/** Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#SPECIFIC_OFFSETS}. */
private Map specificStartupOffsets;
 
/**
 * Creates a generic Kafka {@link StreamTableSource}.
 *
-* @param topic Kafka topic to consume.
-* @param propertiesProperties for the Kafka consumer.
-* @param schemaSchema of the produced table.
-* @param returnTypeType information of the produced 
physical DataStream.
+* @param schema  Schema of the produced table.
+* @param proctimeAttribute   Field name of the processing time 
attribute, null if no
+*processing time field is defined.
+* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+* @param fieldMappingMapping for the fields of the 
table schema to
--- End diff --

Backward compatibility. It could have been null in the past.


> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-13 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6323#discussion_r202506126
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -54,51 +56,105 @@
  * override {@link #createKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
  */
 @Internal
-public abstract class KafkaTableSource
-   implements StreamTableSource, DefinedProctimeAttribute, 
DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+   StreamTableSource,
+   DefinedProctimeAttribute,
+   DefinedRowtimeAttributes,
+   DefinedFieldMapping {
+
+   // common table source attributes
+   // TODO make all attributes final once we drop support for 
format-specific table sources
 
/** The schema of the table. */
private final TableSchema schema;
 
+   /** Field name of the processing time attribute, null if no processing 
time field is defined. */
+   private String proctimeAttribute;
+
+   /** Descriptor for a rowtime attribute. */
+   private List rowtimeAttributeDescriptors;
+
+   /** Mapping for the fields of the table schema to fields of the 
physical returned type or null. */
+   private Map fieldMapping;
+
+   // Kafka-specific attributes
+
/** The Kafka topic to consume. */
private final String topic;
 
/** Properties for the Kafka consumer. */
private final Properties properties;
 
-   /** Type information describing the result type. */
-   private TypeInformation returnType;
-
-   /** Field name of the processing time attribute, null if no processing 
time field is defined. */
-   private String proctimeAttribute;
-
-   /** Descriptor for a rowtime attribute. */
-   private List rowtimeAttributeDescriptors;
+   /** Deserialization schema for decoding records from Kafka. */
+   private final DeserializationSchema deserializationSchema;
 
/** The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
-   private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+   private StartupMode startupMode;
 
/** Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#SPECIFIC_OFFSETS}. */
private Map specificStartupOffsets;
 
/**
 * Creates a generic Kafka {@link StreamTableSource}.
 *
-* @param topic Kafka topic to consume.
-* @param propertiesProperties for the Kafka consumer.
-* @param schemaSchema of the produced table.
-* @param returnTypeType information of the produced 
physical DataStream.
+* @param schema  Schema of the produced table.
+* @param proctimeAttribute   Field name of the processing time 
attribute, null if no
+*processing time field is defined.
+* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+* @param fieldMappingMapping for the fields of the 
table schema to
--- End diff --

Backward compatibility. It could have been null in the past.


---


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543986#comment-16543986
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen Thanks! Looking forward~


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-07-13 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen Thanks! Looking forward~


---


[jira] [Assigned] (FLINK-9849) Upgrade hbase version to 2.0.1 for hbase connector

2018-07-13 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9849:
---

Assignee: zhangminglei

> Upgrade hbase version to 2.0.1 for hbase connector
> --
>
> Key: FLINK-9849
> URL: https://issues.apache.org/jira/browse/FLINK-9849
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> Currently hbase 1.4.3 is used for hbase connector.
> We should upgrade to 2.0.1 which was recently released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9847) OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle unstable

2018-07-13 Thread Congxian Qiu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543965#comment-16543965
 ] 

Congxian Qiu commented on FLINK-9847:
-

Could this be  
{noformat}
int BufferOrEvent#getChannelIndex(){noformat}
return the value of
{code:java}
getChannelIndex.moreAvailable// moreAvailable is boolean{code}
somehow?   

> OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle unstable
> -
>
> Key: FLINK-9847
> URL: https://issues.apache.org/jira/browse/FLINK-9847
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The test 
> {{OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle}} is 
> unstable. When executing repeatedly the test fails from time to time with 
> {code}
> java.lang.Exception: error in task
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle(OneInputStreamTaskTest.java:348)
>   at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:67)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: org.mockito.exceptions.misusing.WrongTypeOfReturnValue: 
> Boolean cannot be returned by getChannelIndex()
> getChannelIndex() should return int
> ***
> If you're unsure why you're getting above error read on.
> Due to the nature of the syntax above problem might occur because:
> 1. This exception *might* occur in wrongly written multi-threaded tests.
>Please refer to Mockito FAQ on limitations of concurrency testing.
> 2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub 
> spies - 
>- with doReturn|Throw() family of methods. More in javadocs for 
> Mockito.spy() method.
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:165)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness$TaskThread.run(StreamTaskTestHarness.java:437)
> {code}
> Given the exception I suspect that there is a problem with mocking in the 
> {{OneInputStreamTaskTestHarness}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

2018-07-13 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6301
  
@jrthe42 Hi, thanks for your PR. 
From my side, I think use a connection pool to solve the connection problem 
is a better way. We don't need to keep the connections all the way. It wastes 
the connection resources if most threads have been idle for a long time. Also, 
the connection pool will not bring extra cost if threads are busy writing data 
into database, since the connections in the pool will be reused. 

I googled just now and find the `MiniConnectionPoolManager ` descriptions 
[here](http://www.source-code.biz/miniconnectionpoolmanager/).  Maybe we can 
use it. 

Best, Hequn



---


[jira] [Commented] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543963#comment-16543963
 ] 

ASF GitHub Bot commented on FLINK-9794:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6301
  
@jrthe42 Hi, thanks for your PR. 
From my side, I think use a connection pool to solve the connection problem 
is a better way. We don't need to keep the connections all the way. It wastes 
the connection resources if most threads have been idle for a long time. Also, 
the connection pool will not bring extra cost if threads are busy writing data 
into database, since the connections in the pool will be reused. 

I googled just now and find the `MiniConnectionPoolManager ` descriptions 
[here](http://www.source-code.biz/miniconnectionpoolmanager/).  Maybe we can 
use it. 

Best, Hequn



> JDBCOutputFormat does not consider idle connection and multithreads 
> synchronization
> ---
>
> Key: FLINK-9794
> URL: https://issues.apache.org/jira/browse/FLINK-9794
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0, 1.5.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of  JDBCOutputFormat has two potential problems: 
> 1. The Connection was established when JDBCOutputFormat is opened, and will 
> be used all the time. But if this connection lies idle for a long time, the 
> database will force close the connection, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but it 
> is also called while snapshotting state. So two threads may modify upload and 
> batchCount, but without synchronization.
> We need fix these two problems to make JDBCOutputFormat more reliable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9847) OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle unstable

2018-07-13 Thread Congxian Qiu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543957#comment-16543957
 ] 

Congxian Qiu commented on FLINK-9847:
-

I just gotten the following error message as I run the test on my laptop
{code:java}
java.lang.Exception: error in task

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle(OneInputStreamTaskTest.java:348)
at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:54)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.mockito.exceptions.misusing.WrongTypeOfReturnValue: 
Boolean cannot be returned by getChannelIndex()
getChannelIndex() should return int
***
If you're unsure why you're getting above error read on.
Due to the nature of the syntax above problem might occur because:
1. This exception *might* occur in wrongly written multi-threaded tests.
Please refer to Mockito FAQ on limitations of concurrency testing.
2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub 
spies - 
- with doReturn|Throw() family of methods. More in javadocs for Mockito.spy() 
method.

at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:165)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness$TaskThread.run(StreamTaskTestHarness.java:437)

{code}

> OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle unstable
> -
>
> Key: FLINK-9847
> URL: https://issues.apache.org/jira/browse/FLINK-9847
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The test 
> {{OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle}} is 
> unstable. When executing repeatedly the test fails from time to time with 
> {code}
> java.lang.Exception: error in task
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.testWatermarksNotForwardedWithinChainWhenIdle(OneInputStreamTaskTest.java:348)
>   at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> 

[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543945#comment-16543945
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
@StephanEwen I have run this on the following set up:
```
Number of shards on Kinesis stream: 384
Number of task slots: 384 / 192 / 96
Throughput achieved per shard (with adaptive reads) : 1.95 Mb/sec /  1.75 
Mb/sec / 1.6 Mb/sec
```



> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-13 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
@StephanEwen I have run this on the following set up:
```
Number of shards on Kinesis stream: 384
Number of task slots: 384 / 192 / 96
Throughput achieved per shard (with adaptive reads) : 1.95 Mb/sec /  1.75 
Mb/sec / 1.6 Mb/sec
```



---


[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543770#comment-16543770
 ] 

ASF GitHub Bot commented on FLINK-9575:
---

Github user Wosin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202480724
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
--- End diff --

Technically we can, but this changes the return type of the future as 
`cleanupJob` does indeed return something.


> Potential race condition when removing JobGraph in HA
> -
>
> Key: FLINK-9575
> URL: https://issues.apache.org/jira/browse/FLINK-9575
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: Dominik Wosiński
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after 
> invoking _cancel()_, the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
> result = if (removeJobFromStateBackend) { val futureOption = Some(future { 
> try { // ...otherwise, we can have lingering resources when there is a 
> concurrent shutdown // and the ZooKeeper client is closed. Not removing the 
> job immediately allow the // shutdown to release all resources. 
> submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
> log.warn(s"Could not remove submitted job graph $jobID.", t) } 
> }(context.dispatcher)) try { archive ! decorateMessage( 
> ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch 
> { case t: Throwable => log.warn(s"Could not archive the execution graph 
> $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result 
> case None => None } // remove all job-related BLOBs from local and HA store 
> libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
> removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) 
> futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent 
> shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow 
> the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph 
> $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", 
> t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of 
> blob files connected with this jar. This means as far as I understand that 
> there is a potential problem that we can fail to remove job graph from 
> _submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
> can try to recover such job, but it will fail with an exception since the 
> assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-13 Thread Wosin
Github user Wosin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202480724
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
--- End diff --

Technically we can, but this changes the return type of the future as 
`cleanupJob` does indeed return something.


---


[jira] [Commented] (FLINK-9848) When Parallelism more than available task slots Flink stays idle

2018-07-13 Thread Yazdan Shirvany (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543681#comment-16543681
 ] 

Yazdan Shirvany commented on FLINK-9848:


Actually I re-tried with multiple examples and seems only 
{{batch/WordCount.jar}} can be successful when {{parallelism > available slots}}

> When Parallelism more than available task slots Flink stays idle
> 
>
> Key: FLINK-9848
> URL: https://issues.apache.org/jira/browse/FLINK-9848
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> For version 1.4.x when select Parallelism > Available task Slots, Flink throw 
> bellow error right away 
> {{NoResourceAvailableException: Not enough free slots available to run the 
> job}}
>   
>  but for version 1.5.x there are two different behaviors: Sometimes job ran 
> successfully and sometimes it throw bellow error after 5 minutes 
>  
> {{org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate all requires slots within timeout of 30 ms. Slots 
> required: 5, slots allocated: 2}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543538#comment-16543538
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/6332
  
One more thing. Do we need this `-u` flag? Shouldn't it be enough to 
support something like:
`flink-cli < query01.sql` or `echo "SELECT * FROM foo" | flink-cli`


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6332: [FLINK-8858] [sql-client] Add support for INSERT INTO in ...

2018-07-13 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/6332
  
One more thing. Do we need this `-u` flag? Shouldn't it be enough to 
support something like:
`flink-cli < query01.sql` or `echo "SELECT * FROM foo" | flink-cli`


---


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543535#comment-16543535
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202421784
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -381,6 +433,23 @@ private Table createTable(TableEnvironment tableEnv, 
String query) {
}
}
 
+   /**
+* Applies the given update statement to the given table environment 
with query configuration.
+*
+* @param tableEnv table environment
+* @param queryConfig query configuration
+* @param statement SQL update statement (e.g. INSERT INTO)
+*/
+   private void applyUpdate(TableEnvironment tableEnv, QueryConfig 
queryConfig, String statement) {
--- End diff --

rename `statement ` -> `updateStatement` and drop `@param` section from 
java doc?


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202398523
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor 
executor) {
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
+   // make space from previous output and test the writer
+   terminal.writer().println();
--- End diff --

why is that needed? Shouldn't we print new line at then of the output 
instead?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202421703
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -371,6 +416,13 @@ public void stop(SessionContext session) {
result.isMaterialized());
}
 
+   /**
+* Creates a table using the given query in the given table environment.
+*
+* @param tableEnv table environment
+* @param query SQL SELECT query
+* @return result table object
+*/
private Table createTable(TableEnvironment tableEnv, String query) {
--- End diff --

rename `query` -> `selectQuery` and drop `@param` section from java doc?


---


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543528#comment-16543528
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202419608
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

do we have tests for that? Some `ITCase`? 


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543531#comment-16543531
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202421703
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -371,6 +416,13 @@ public void stop(SessionContext session) {
result.isMaterialized());
}
 
+   /**
+* Creates a table using the given query in the given table environment.
+*
+* @param tableEnv table environment
+* @param query SQL SELECT query
+* @return result table object
+*/
private Table createTable(TableEnvironment tableEnv, String query) {
--- End diff --

rename `query` -> `selectQuery` and drop `@param` section from java doc?


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202421784
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -381,6 +433,23 @@ private Table createTable(TableEnvironment tableEnv, 
String query) {
}
}
 
+   /**
+* Applies the given update statement to the given table environment 
with query configuration.
+*
+* @param tableEnv table environment
+* @param queryConfig query configuration
+* @param statement SQL update statement (e.g. INSERT INTO)
+*/
+   private void applyUpdate(TableEnvironment tableEnv, QueryConfig 
queryConfig, String statement) {
--- End diff --

rename `statement ` -> `updateStatement` and drop `@param` section from 
java doc?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202421005
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -329,14 +341,46 @@ public void stop(SessionContext session) {
}
}
 
-   private  ResultDescriptor executeQueryInternal(ExecutionContext 
context, String query) {
+   private  ProgramTargetDescriptor 
executeUpdateInternal(ExecutionContext context, String statement) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   // apply update statement
--- End diff --

some of those comments are a little bit unnecessary?


---


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543533#comment-16543533
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202420258
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) {
}
}
 
+   private boolean callInsertInto(SqlCommandCall cmdCall) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
+   terminal.flush();
+
+   try {
+   final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
--- End diff --

```
programTarget.writeTo(terminal.writer())
```
? It would be easier to add more fields in the future and more difficult to 
forget about printing them.



> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543527#comment-16543527
 ] 

ASF GitHub Bot commented on FLINK-8858:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202407072
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 ---
@@ -97,14 +97,34 @@ private void start() {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new 
EmbeddedShutdownThread(context, executor));
 
-   // start CLI
-   final CliClient cli = new CliClient(context, executor);
-   cli.open();
+   // do the actual work
+   openCli(context, executor);
} else {
throw new SqlClientException("Gateway mode is not 
supported yet.");
}
}
 
+   /**
+* Opens the CLI client for executing SQL statements.
+*
+* @param context session context
+* @param executor executor
+*/
+   private void openCli(SessionContext context, Executor executor) {
+   final CliClient cli = new CliClient(context, executor);
+   // interactive CLI mode
+   if (options.getUpdateStatement() == null) {
+   cli.open();
+   }
+   // execute single update statement
+   else {
+   final boolean success = 
cli.submitUpdate(options.getUpdateStatement());
--- End diff --

we do not wait for the query to complete?


> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202407651
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
+   terminal.writer().println(new 
AttributedString(statement).toString());
+   terminal.flush();
 
-   if (cmdCall == null) {
-   
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL));
-   continue;
-   }
+   final Optional parsedStatement = 
SqlCommandParser.parse(statement);
--- End diff --

deduplicate parsing call and `if` check - there is already a bug here, 
either missing `flush` or unnecessary `flush`


---


  1   2   3   4   5   >