[GitHub] [flink] HuangZhenQiu commented on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
HuangZhenQiu commented on issue #10484: [FLINK-14906][table] create and drop 
temp system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#issuecomment-562922102
 
 
   @bowenli86 Thanks for the feedback. I agree with your suggestion. We should 
distinguish system or catalog function from the very beginning. Thus, different 
types of operations should be created for system and catalog functions. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
HuangZhenQiu commented on a change in pull request #10484: [FLINK-14906][table] 
create and drop temp system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#discussion_r355167003
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
 ##
 @@ -33,16 +33,22 @@
private final String className; // Fully qualified class name of the 
function
private final FunctionLanguage functionLanguage;
private final boolean isTemporary;
+   private final boolean isSystem;
 
 Review comment:
   Removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10329: [FLINK-12785][StateBackend] RocksDB 
savepoint recovery can use a lot of unmanaged memory
URL: https://github.com/apache/flink/pull/10329#issuecomment-558970068
 
 
   
   ## CI report:
   
   * e3aa8e5a567a13f07598ee6fe0e8fb8e72dfb986 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138360657)
   * 86fcd30935fbbcdbd4b976c0fc3f17194876b4de : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139353188)
   * f1dda3798762bb41163d3fde9a45ce1b5eb14b0d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139470678)
   * ebf6b89063a2acd1ff9765b76088c41aead0a5d9 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139719201)
   * 19e4b4c864f95e2b355a09742af684c453daa223 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139844119)
   * e2941014c7fcf0fcb2e0e89361328370641d1d50 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139940394)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Li-Aihua commented on issue #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
Li-Aihua commented on issue #10436: [FLINK-14920] [flink-end-to-end-perf-tests] 
Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#issuecomment-562920893
 
 
   Thanks @hequn8128 for your comments. I fixed all places in your comment.Now 
, this e2e perf test is running in the cluster, i will add the result later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Li-Aihua commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
Li-Aihua commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355165876
 
 

 ##
 File path: tools/jenkins/run_case.py
 ##
 @@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file will be runned by jenkis to run the e2e perf test
+# Params:
+# am_seserver_dddress: master machines's ip  of standalone environment
+# scenario_file: the file which contains test scenarios
+# flink_home: the path of flink
+# inter_nums:the num of every scenario's running, default value is 10
+# wait_minute: interval time of two elections of qps,default value is 10s
+#
+
+
+import sys
+import time
+
+if sys.version_info < (3, 5):
+print("Python versions prior to 3.5 are not supported.")
+sys.exit(-1)
+
+from logger import logger
+from utils import run_command
+from restapi_common import get_avg_qps_by_restful_interface
+
+
+def start_server(flink_home):
+cmd = "bash %s/bin/start-cluster.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def end_server(flink_home):
+cmd = "bash %s/bin/stop_yarn.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def get_scenarios(scenario_file_name, test_jar):
+"""
+parser file which contains serval scenarios,it's content likes this:
+classPath   scenarioName  jobparam1 jobparams2
+org.apache.Test testScenario1 aaa   bbb
+……
+:param scenario_file_name: scenario's file
+:param test_jar:
+:return: list of scenarios
+"""
+params_name = []
+scenarios = []
+scenario_names = []
+linenum = 0
+with open(scenario_file_name) as file:
+data = file.readline()
+if not (data.startswith("#") or data == ""):
+linenum = linenum + 1
+cmd = ""
+scenario_name = ""
+if linenum == 1:
+params_name = data.split(" ")
+for index in range(0, len(params_name)):
+params_name[index] = params_name[index]
+if not "testClassPath" in params_name:
+return 1, []
+else:
+params_value = data.split(" ")
+for index in range(0, len(params_name)):
+param = params_name[index]
+if param == "testClassPath":
+cmd = "-c %s %s %s" % (params_value[index], test_jar,  
cmd)
+else:
+if param == "":
+cmd = "--%s %s" % (param, params_value[index])
+else:
+cmd = "%s --%s %s" % (cmd, param, 
params_value[index])
+scenario_name = "%s_%s" % (scenario_name, param)
+scenario_names.append(scenario_name[1:])
+scenarios.append(cmd)
+return 0, scenarios, scenario_names
+
+
+def get_avg(values):
+if len(values) == 0:
+return 0.0
+else:
+return sum(values) * 1.0 / len(values)
+
+
+def run_cases(scenario_file_name, flink_home, am_seserver_dddress, 
inter_nums=10, wait_minute=10):
+status, scenarios, scenario_names = get_scenarios(scenario_file_name)
+for scenario_index in range(0, len(scenarios)):
+scenario = scenarios.get(scenario_index)
+scenario_name = scenario_names[scenario_index]
+total_qps = []
+status = start_server(flink_home)
 
 Review comment:
   yes, it's ok to reuse cluster, i updated it 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 

[GitHub] [flink] flinkbot edited a comment on issue #9973: [FLINK-9955] Add Kubernetes ClusterDescriptor to support deploying session cluster.

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #9973: [FLINK-9955] Add Kubernetes 
ClusterDescriptor to support deploying session cluster.
URL: https://github.com/apache/flink/pull/9973#issuecomment-545325448
 
 
   
   ## CI report:
   
   * 450cd1029219372cdc4f05f4bf6bc3ff6e43dc90 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133141964)
   * 6285ff3752b3d5aa9e3237601d77fdd44af17874 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133366937)
   * 614834429a8548e253adae3f73fd6d0cae01a3b5 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/133998662)
   * baba3f140349fef37cdbd1fd0002619a8063b1db : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134010535)
   * 1f1ec2e29f04ef06e9cc05bca8a406d9eee5a957 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137145239)
   * 62d628cda95e2fee7f74435bffe0bd6885490f5c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017618)
   * e351cd251218ab155de872cea5df6c9e9e242c8e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139116860)
   * bce759ca103b5ca92a259438f40f0877c1c9483a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139256976)
   * 63bc2b0acc6e3db11243d5c81ffe5565aa26d9ab : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139464342)
   * 336f18f2bb935306e7d1088691d624158d0e5de0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139677734)
   * 5bfa111036c999b352b7c003f2ffae036b968f3f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139843038)
   * e8267bcdcbe0458d8eb3f055f1a261acd9027995 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139844138)
   * b0f46f54460039468e55c6feee2f0fa9df625cbe : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139847929)
   * 1bd7fa188ecc6814435d18f409a5e3629c717365 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10480: [FLINK-15095] [table-planner-blink] bridge table schema's primary key to metadata handler in blink planner

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10480: [FLINK-15095] [table-planner-blink] 
bridge table schema's primary key to metadata handler in blink planner
URL: https://github.com/apache/flink/pull/10480#issuecomment-562858870
 
 
   
   ## CI report:
   
   * f2cc63f3495dab7e6a1bb09f8663265739e41ffd : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139862381)
   * ce6f2c3b92121d7c36803595bb2665992d0c3ab2 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139940590)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10228: [FLINK-14816] Add thread dump 
feature for taskmanager
URL: https://github.com/apache/flink/pull/10228#issuecomment-554599522
 
 
   
   ## CI report:
   
   * 8c07f8927fcbb96ae2e6f7b155ebc89287e2dbe9 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/136797904)
   * 32f7901560ffd6635e5cdc4463327906574eceda : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/136883965)
   * 57b530f5e6228e59c3899f475c212c4dcf52d56a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137310623)
   * 551b4df0f88a31a0989e3428ac4e89423514d4f9 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137496433)
   * 53cbeb683f8fb71b2eed0eb1e0833fdfbe00379e : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/140014987)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10474: [FLINK-15116] Make JobClient stateless, remove AutoCloseable

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10474: [FLINK-15116] Make JobClient 
stateless, remove AutoCloseable
URL: https://github.com/apache/flink/pull/10474#issuecomment-562698759
 
 
   
   ## CI report:
   
   * b29a8cc494e7033b9c8fb21ab982060b57aa90f3 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139804062)
   * 013f138b677b2761458f591c2ea31ee8d166ce0f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139808162)
   * 11f28d34c651b6629217d379b52de05e68bbf33e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139823571)
   * 434805415d38683f31eddb46029b9c0124a8d130 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139827112)
   * 386a099b31f4d34d403bf1fd2158f2a294f57079 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139832995)
   * 43376b79e27f937041e74995ce3de5bb4d7808c7 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139850916)
   * 547c156d4068dcecf85ea91a1d58057824c09763 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139854101)
   * c0262593c93b0f66be2ddcd415130e45f6408faf : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139880387)
   * ba1b25eabbc824c8c6186edf179c97846bd8fbee : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139886739)
   * e28f2d30d556ca8d38165a1eb5a7f22da43bbb2a : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-15020) Support timestamp type in hive

2019-12-07 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-15020.
--
  Assignee: Rui Li
Resolution: Fixed

master: 57bb952e3a1732caa5acb640a1712bcdd55d813d

> Support timestamp type in hive
> --
>
> Key: FLINK-15020
> URL: https://issues.apache.org/jira/browse/FLINK-15020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Jingsong Lee
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Now, FLINK-14599 is finished, blink-planner have the ability to support 
> timestamp type with precision 9. So we can support timestamp type in hive now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KurtYoung closed pull request #10401: [FLINK-15020][hive] Support timestamp type in hive

2019-12-07 Thread GitBox
KurtYoung closed pull request #10401: [FLINK-15020][hive] Support timestamp 
type in hive
URL: https://github.com/apache/flink/pull/10401
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung closed pull request #10480: [FLINK-15095] [table-planner-blink] bridge table schema's primary key to metadata handler in blink planner

2019-12-07 Thread GitBox
KurtYoung closed pull request #10480: [FLINK-15095] [table-planner-blink] 
bridge table schema's primary key to metadata handler in blink planner
URL: https://github.com/apache/flink/pull/10480
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-15095) bridge table schema's primary key to metadata handler in blink planner

2019-12-07 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-15095.
--
Resolution: Fixed

master: d4bd2d6ee64ba83c42981e4e62374fb962bb71bf

> bridge table schema's primary key to metadata handler in blink planner
> --
>
> Key: FLINK-15095
> URL: https://issues.apache.org/jira/browse/FLINK-15095
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10228: [FLINK-14816] Add thread dump 
feature for taskmanager
URL: https://github.com/apache/flink/pull/10228#issuecomment-554599522
 
 
   
   ## CI report:
   
   * 8c07f8927fcbb96ae2e6f7b155ebc89287e2dbe9 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/136797904)
   * 32f7901560ffd6635e5cdc4463327906574eceda : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/136883965)
   * 57b530f5e6228e59c3899f475c212c4dcf52d56a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137310623)
   * 551b4df0f88a31a0989e3428ac4e89423514d4f9 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137496433)
   * 53cbeb683f8fb71b2eed0eb1e0833fdfbe00379e : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10481: [FLINK-14824][table] Improve schema derivation for formats

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10481: [FLINK-14824][table] Improve schema 
derivation for formats
URL: https://github.com/apache/flink/pull/10481#issuecomment-562867016
 
 
   
   ## CI report:
   
   * 70e7764be333154ec2efa0ad0a6e8d8510624628 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139866904)
   * b8d5ca6c35448661731186e78a4d1dc753e337fe : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139919996)
   * b7b2a59fb22d9a2d8b6e003a9624a89bbb8d656c : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/140009245)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10481: [FLINK-14824][table] Improve schema derivation for formats

2019-12-07 Thread GitBox
JingsongLi commented on a change in pull request #10481: [FLINK-14824][table] 
Improve schema derivation for formats
URL: https://github.com/apache/flink/pull/10481#discussion_r355163918
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -1321,12 +1322,12 @@ The CSV format can be used as follows:
 .withFormat(
   new Csv()
 
-// required: define the schema either by using type information
-.schema(Type.ROW(...))
-
-// or use the table's schema
+// optional: use the table's schema as format schema, this is enabled by 
default
 .deriveSchema()
 
 Review comment:
   Maybe we can remove it from doc now?
   Including `.deriveSchema()` and `.schema(Type.ROW(...))`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW merged pull request #9986: [FLINK-10933][kubernetes] Implement KubernetesSessionClusterEntrypoint.

2019-12-07 Thread GitBox
zhijiangW merged pull request #9986: [FLINK-10933][kubernetes] Implement 
KubernetesSessionClusterEntrypoint.
URL: https://github.com/apache/flink/pull/9986
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10481: [FLINK-14824][table] Improve schema derivation for formats

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10481: [FLINK-14824][table] Improve schema 
derivation for formats
URL: https://github.com/apache/flink/pull/10481#issuecomment-562867016
 
 
   
   ## CI report:
   
   * 70e7764be333154ec2efa0ad0a6e8d8510624628 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139866904)
   * b8d5ca6c35448661731186e78a4d1dc753e337fe : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139919996)
   * b7b2a59fb22d9a2d8b6e003a9624a89bbb8d656c : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10371: [FLINK-14953][formats] use table 
type to build parquet FilterPredicate
URL: https://github.com/apache/flink/pull/10371#issuecomment-560045651
 
 
   
   ## CI report:
   
   * 24326ffa979a4f9ed56265c0db11a874d3c4d2a2 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138813356)
   * f7d69ecf50a0af52d04af9aafcfd18d03545e6e9 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138816941)
   * 43a4b42d5163054336fd56cc2e6e5e5e9b387bfb : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/140002197)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken commented on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager

2019-12-07 Thread GitBox
lamber-ken commented on issue #10228: [FLINK-14816] Add thread dump feature for 
taskmanager
URL: https://github.com/apache/flink/pull/10228#issuecomment-562916680
 
 
   > Now that the old Web UI is dropped, can we rebase this on the latest 
master?
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9986: [FLINK-10933][kubernetes] Implement KubernetesSessionClusterEntrypoint.

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #9986: [FLINK-10933][kubernetes] Implement 
KubernetesSessionClusterEntrypoint.
URL: https://github.com/apache/flink/pull/9986#issuecomment-545891630
 
 
   
   ## CI report:
   
   * 593bf42620faf09c1accbd692494646194e3d574 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133371151)
   * bb9fbd1d51a478793f63ae8b6d6e92b6a5a53775 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133940616)
   * bc25d444faeeaa773f040b14159aafe5a6a5a975 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133953929)
   * d8819bf3615c497b501399bc476de889c17dc239 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133998596)
   * d1430642ae91d8ed58479fb9d1492c433312a9b2 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134010391)
   * 17ffc7f7d12f2115eb6b4c86af2c627ce1ad68aa : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137136929)
   * 0b0fb6b154fc91f944fa8972266b72578cd4e765 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017441)
   * 8a58370a6aa7b67ee294af764c25ced0e24ba364 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138990632)
   * 6a72f32c09cf42b6ed473f17b31f5fbc18b720f8 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139116907)
   * 69fb531209587ab73fae38c574017c4ea19ac759 : UNKNOWN
   * 5c31b05a3aa53614934466638d25aea1e4d0fc9a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139888646)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10484: [FLINK-14906][table] create and drop 
temp system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#issuecomment-562914317
 
 
   
   ## CI report:
   
   * c604c9f8704578c33884b2faf8d163a7482795c4 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139991088)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] 
create and drop temp system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#discussion_r355162107
 
 

 ##
 File path: flink-python/pyflink/table/catalog.py
 ##
 @@ -804,6 +804,13 @@ def is_temporary(self):
 """
 return self._j_catalog_function.isTemporary()
 
+def is_system(self):
 
 Review comment:
   no need for this in catalog function


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] 
create and drop temp system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#discussion_r355162113
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterFunction.java
 ##
 @@ -118,6 +118,10 @@ public boolean isTemporary() {
return isTemporary;
}
 
+   public boolean isSystemFunction() {
 
 Review comment:
   no need for this


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] 
create and drop temp system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#discussion_r355162115
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java
 ##
 @@ -33,16 +33,22 @@
private final String className; // Fully qualified class name of the 
function
private final FunctionLanguage functionLanguage;
private final boolean isTemporary;
+   private final boolean isSystem;
 
 Review comment:
   no need for this in catalog function


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355162114
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (null == hostsStr || hostsStr.length() == 0) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr);
+   }
+
+   final String[] hosts = hostsStr.split(";");
 
 Review comment:
   `descriptorProperties.getString` can ensure hostsStr not empty,  `hosts.size 
> 0` is always true here, Should we validate it again?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] 
create and drop temp system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#discussion_r355162109
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_catalog.py
 ##
 @@ -69,6 +69,7 @@ def check_catalog_function_equals(self, f1, f2):
 self.assertEqual(f1.get_class_name(), f2.get_class_name())
 self.assertEqual(f1.is_generic(), f2.is_generic())
 self.assertEqual(f1.is_temporary(), f2.is_temporary())
+self.assertEqual(f1.is_system(), f2.is_system())
 
 Review comment:
   no need for this in catalog function


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #10381: [FLINK-14513][hive] Implement listPartitionsByFilter to HiveCatalog

2019-12-07 Thread GitBox
xuefuz commented on a change in pull request #10381: [FLINK-14513][hive] 
Implement listPartitionsByFilter to HiveCatalog
URL: https://github.com/apache/flink/pull/10381#discussion_r355161669
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 ##
 @@ -175,4 +186,93 @@ public static boolean requireRelyConstraint(byte trait) {
return (trait & HIVE_CONSTRAINT_RELY) != 0;
}
 
+   /**
+* Generates a filter string for partition columns from the given 
filter expressions.
+*
+* @param numNonPartCol The number of non-partition columns -- used to 
shift field reference index
+* @param partColNames The names of all partition columns
+* @param expressions  The filter expressions in CNF form
+* @return an Optional filter string equivalent to the expressions, 
which is empty if the expressions can't be handled
+*/
+   public static Optional makePartitionFilter(int numNonPartCol, 
List partColNames, List expressions) {
 
 Review comment:
   Nit: what about rename numNonPartCol to partColOffset?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355162022
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 ##
 @@ -177,13 +157,112 @@ public void testTableSource() {

assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
}
 
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testTableSourceWithLegacyProperties() {
+   // prepare parameters for Kafka table source
+   final TableSchema schema = TableSchema.builder()
+   .field(FRUIT_NAME, DataTypes.STRING())
+   .field(COUNT, DataTypes.DECIMAL(10, 3))
+   .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
+   .field(PROC_TIME, DataTypes.TIMESTAMP(3))
+   .build();
+
+   final List 
rowtimeAttributeDescriptors = Collections.singletonList(
+   new RowtimeAttributeDescriptor(EVENT_TIME, new 
ExistingField(TIME), new AscendingTimestamps()));
+
+   final Map fieldMapping = new HashMap<>();
+   fieldMapping.put(FRUIT_NAME, NAME);
+   fieldMapping.put(NAME, NAME);
+   fieldMapping.put(COUNT, COUNT);
+   fieldMapping.put(TIME, TIME);
+
+   final Map specificOffsets = new 
HashMap<>();
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
+
+   final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
+   TableSchema.builder()
+   .field(NAME, DataTypes.STRING())
+   .field(COUNT, 
DataTypes.DECIMAL(10, 3))
+   .field(TIME, 
DataTypes.TIMESTAMP(3))
+   .build()
+   .toRowType()
+   );
+
+   final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
+   schema,
+   Optional.of(PROC_TIME),
+   rowtimeAttributeDescriptors,
+   fieldMapping,
+   TOPIC,
+   KAFKA_PROPERTIES,
+   deserializationSchema,
+   StartupMode.SPECIFIC_OFFSETS,
+   specificOffsets);
+
+   TableSourceValidation.validateTableSource(expected);
+
+   // construct table source using descriptors and table source 
factory
+   final Map legacyPropertiesMap = new HashMap<>();
+   legacyPropertiesMap.putAll(createKafkaSourceProperties());
+
+   // use legacy properties
+   legacyPropertiesMap.remove("connector.specific-offsets");
+   
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
+   
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
+   legacyPropertiesMap.remove("connector.properties.group.id");
+
+   
legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0");
+   legacyPropertiesMap.put("connector.specific-offsets.0.offset", 
"100");
+   
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
+   legacyPropertiesMap.put("connector.specific-offsets.1.offset", 
"123");
+   legacyPropertiesMap.put("connector.properties.0.key", 
"zookeeper.connect");
+   legacyPropertiesMap.put("connector.properties.0.value", 
"dummy");
+   legacyPropertiesMap.put("connector.properties.1.key", 
"bootstrap.servers");
+   legacyPropertiesMap.put("connector.properties.1.value", 
"dummy");
+   legacyPropertiesMap.put("connector.properties.2.key", 
"group.id");
+   legacyPropertiesMap.put("connector.properties.2.value", 
"dummy");
+
+   final TableSource actualSource = 
TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
+   .createStreamTableSource(legacyPropertiesMap);
+
+   assertEquals(expected, actualSource);
+
+   // test Kafka consumer
+   final KafkaTableSourceBase actualKafkaSource = 
(KafkaTableSourceBase) actualSource;
+   final StreamExecutionEnvironmentMock mock

[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161955
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return SpecificOffsets with map format, key is partition, and value 
is offset.
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
+   final Map offsetMap = new HashMap<>();
+
+   final String parseSpecificOffsetsStr = 
descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS);
+   if (parseSpecificOffsetsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_SPECIFIC_OFFSETS + "' can not be empty.");
+   }
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-15127) rename CreateFunctionOperation, DropFunctionOperation, AlterFunctionOperation to CreateCatalogFunctionOperation, DropCatalogFunctionOperation, AlterCatalogFunctionOperat

2019-12-07 Thread Bowen Li (Jira)
Bowen Li created FLINK-15127:


 Summary: rename CreateFunctionOperation, DropFunctionOperation, 
AlterFunctionOperation to CreateCatalogFunctionOperation, 
DropCatalogFunctionOperation, AlterCatalogFunctionOperation
 Key: FLINK-15127
 URL: https://issues.apache.org/jira/browse/FLINK-15127
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Bowen Li
Assignee: Zhenqiu Huang


rename these operations since they should only support operations related to 
catalog functions (both temp and persistent)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10371: [FLINK-14953][formats] use table 
type to build parquet FilterPredicate
URL: https://github.com/apache/flink/pull/10371#issuecomment-560045651
 
 
   
   ## CI report:
   
   * 24326ffa979a4f9ed56265c0db11a874d3c4d2a2 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138813356)
   * f7d69ecf50a0af52d04af9aafcfd18d03545e6e9 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138816941)
   * 43a4b42d5163054336fd56cc2e6e5e5e9b387bfb : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161941
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 ##
 @@ -265,24 +259,22 @@ public Kafka sinkPartitionerCustom(Class partit
}
 
if (specificOffsets != null) {
-   final List> values = new ArrayList<>();
+   final StringBuilder stringBuilder = new StringBuilder();
+   int i = 0;
for (Map.Entry specificOffset : 
specificOffsets.entrySet()) {
-   
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
+   if (i != 0) {
+   stringBuilder.append(';');
+   }
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(',');
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue());
+   i++;
}
-   properties.putIndexedFixedProperties(
-   CONNECTOR_SPECIFIC_OFFSETS,
-   
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET),
-   values);
+   properties.putString(CONNECTOR_SPECIFIC_OFFSETS, 
stringBuilder.toString());
}
 
if (kafkaProperties != null) {
-   properties.putIndexedFixedProperties(
-   CONNECTOR_PROPERTIES,
-   Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE),
-   this.kafkaProperties.entrySet().stream()
-   .map(e -> Arrays.asList(e.getKey(), 
e.getValue()))
-   .collect(Collectors.toList())
-   );
+   this.kafkaProperties.entrySet().forEach(entry ->
+   
properties.putString(CONNECTOR_PROPERTIES + '.' + entry.getKey(), 
entry.getValue()));
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161944
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161898
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (hostsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty.");
+   }
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10364: [FLINK-14993][metrics] FrontMetricGroup passes reporter-specific parameters

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10364: [FLINK-14993][metrics] 
FrontMetricGroup passes reporter-specific parameters
URL: https://github.com/apache/flink/pull/10364#issuecomment-559827376
 
 
   
   ## CI report:
   
   * bf9a31483c0d55c968f65b8ca4a11557f52de456 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138727067)
   * 8492919a37ecd85428c327d9893fb0ca1fa07734 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139364349)
   * 72284c361a6ff957c3f8d831d49dc4c93ac8f282 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139878424)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161876
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok ,I thnink validateAndParseHostsString will be better


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161876
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok ,I think validateAndParseHostsString will be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161864
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok, I think validateAndGetHosts will be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161869
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return SpecificOffsets with map format, key is partition, and value 
is offset.
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161864
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok, I think validateAndGetHosts will be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161822
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-15126) migrate "show functions" from sql cli to sql parser

2019-12-07 Thread Bowen Li (Jira)
Bowen Li created FLINK-15126:


 Summary: migrate "show functions" from sql cli to sql parser
 Key: FLINK-15126
 URL: https://issues.apache.org/jira/browse/FLINK-15126
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client, Table SQL / Planner
Reporter: Bowen Li
Assignee: Zhenqiu Huang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lamber-ken edited a comment on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0

2019-12-07 Thread GitBox
lamber-ken edited a comment on issue #9751: [FLINK-14177] bump curator from 
2.12.0 to 4.2.0
URL: https://github.com/apache/flink/pull/9751#issuecomment-562915008
 
 
   > I wouldn't want to increment such a crucial dependency so close to a 
release.
   
   Okay,  we may already spend many time on this, because this issue was opened 
at  on 24 Sep. I need to check whether there are conflicting files often.
   
   Btw, https://github.com/apache/flink/pull/9158 was blocked for this issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161802
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java
 ##
 @@ -308,13 +304,7 @@ public Elasticsearch connectionPathPrefix(String 
pathPrefix) {
final DescriptorProperties properties = new 
DescriptorProperties();
properties.putProperties(internalProperties);
 
-   final List> hostValues = hosts.stream()
-   .map(host -> Arrays.asList(host.hostname, 
String.valueOf(host.port), host.protocol))
-   .collect(Collectors.toList());
-   properties.putIndexedFixedProperties(
-   CONNECTOR_HOSTS,
-   Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, 
CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL),
-   hostValues);
+   properties.putString(CONNECTOR_HOSTS, 
hosts.stream().map(Host::toString).collect(Collectors.joining(";")));
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken commented on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0

2019-12-07 Thread GitBox
lamber-ken commented on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 
4.2.0
URL: https://github.com/apache/flink/pull/9751#issuecomment-562915008
 
 
   > I wouldn't want to increment such a crucial dependency so close to a 
release.
   
   Okay,  we may already spend many time on this, because this issue was opened 
at  on 24 Sep. I need to check whether there are conflicting files often.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161756
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -247,10 +247,8 @@ tables:
   topic: test-input
   startup-mode: earliest-offset
   properties:
-- key: zookeeper.connect
-  value: localhost:2181
-- key: bootstrap.servers
-  value: localhost:9092
+zookeeper.connect: localhost:2181,localhost:2182
 
 Review comment:
   but in FLIP 86, we defined an example as following:
   `'connector.properties.zookeeper.connect'='localhost:2181,localhost:2182',
   'connector.properties.bootstrap.servers'='localhost:9092,localhost:9093',
   'connector.properties.group.id'='testGroup'
   `
   so I think keep this or use 'hostname1:2181,host_name2:2181' can make sense 
both. @JingsongLi 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] 
create and drop temp system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#discussion_r355161575
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 ##
 @@ -204,6 +204,15 @@ public boolean 
hasTemporaryCatalogFunction(ObjectIdentifier functionIdentifier)
return tempCatalogFunctions.containsKey(normalizedIdentifier);
}
 
+   /**
+* Check whether a temporary system function is already registered.
+* @param functionName the name of the function
+* @return whether the temporary system function exists in the function 
catalog
+*/
+   public boolean hasSystemCatalogFunction(String functionName) {
 
 Review comment:
   should be `hasTempSystemFunction`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161716
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -247,10 +247,8 @@ tables:
   topic: test-input
   startup-mode: earliest-offset
   properties:
-- key: zookeeper.connect
-  value: localhost:2181
-- key: bootstrap.servers
-  value: localhost:9092
+zookeeper.connect: localhost:2181,localhost:2182
 
 Review comment:
   I'm fine with keeping previous example, it is a kafka properties, the Kafka 
users should know or refer Kafka docs for multiple hosts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161684
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) 
{
hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> 
properties.validateString(key, false, 1));
hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> 
properties.validateInt(key, false, 0, 65535));
hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> 
properties.validateString(key, false, 1));
-   properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, 
false, hostsValidators);
+
+   if (properties.containsKey(CONNECTOR_HOSTS)) {
+   validateAndGetHostsStr(properties);
 
 Review comment:
   we'd better parse and validate at the same time because 
`DescriptorProperties` do not support `Consumer` that can validate `String` 
value  like `partition:0,offset:42;partition:1,offset:300`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161661
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 ##
 @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit
}
 
if (specificOffsets != null) {
-   final List> values = new ArrayList<>();
+   final StringBuilder stringBuilder = new StringBuilder();
+   int i = 0;
for (Map.Entry specificOffset : 
specificOffsets.entrySet()) {
-   
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
+   if (i != 0) {
+   stringBuilder.append(';');
+   }
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(',');
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue());
 
 Review comment:
   Please break method chain if it is too long.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161115
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
 
 Review comment:
   Remove `@param` if no documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355158905
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) 
{
hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> 
properties.validateString(key, false, 1));
hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> 
properties.validateInt(key, false, 0, 65535));
hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> 
properties.validateString(key, false, 1));
-   properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, 
false, hostsValidators);
+
+   if (properties.containsKey(CONNECTOR_HOSTS)) {
+   validateAndGetHostsStr(properties);
+   } else {
+   
properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, 
hostsValidators);
 
 Review comment:
   You can put the initialization of `hostsValidators` in this `else` clause.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355159036
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   1. You can `import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;`
 to remove the the `ElasticsearchUpsertTableSinkBase.` prefix. 
   2. The method name should be `validateAndParseHostsString` or 
`validateAndGetHosts`? Because the return value is not a string.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355158623
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java
 ##
 @@ -308,13 +304,7 @@ public Elasticsearch connectionPathPrefix(String 
pathPrefix) {
final DescriptorProperties properties = new 
DescriptorProperties();
properties.putProperties(internalProperties);
 
-   final List> hostValues = hosts.stream()
-   .map(host -> Arrays.asList(host.hostname, 
String.valueOf(host.port), host.protocol))
-   .collect(Collectors.toList());
-   properties.putIndexedFixedProperties(
-   CONNECTOR_HOSTS,
-   Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, 
CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL),
-   hostValues);
+   properties.putString(CONNECTOR_HOSTS, 
hosts.stream().map(Host::toString).collect(Collectors.joining(";")));
 
 Review comment:
   The method chain is too long. Please break it. 
   
   ```java
   properties.putString(
CONNECTOR_HOSTS,
hosts.stream()
.map(Host::toString)
.collect(Collectors.joining(";")));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjffdu commented on issue #10474: [FLINK-15116] Make JobClient stateless, remove AutoCloseable

2019-12-07 Thread GitBox
zjffdu commented on issue #10474: [FLINK-15116] Make JobClient stateless, 
remove AutoCloseable
URL: https://github.com/apache/flink/pull/10474#issuecomment-562914776
 
 
   @aljoscha This PR works for Zeppelin, considering the feature freeze, let's 
move the discussion of general FlinkClient in future.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161561
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 ##
 @@ -177,13 +157,112 @@ public void testTableSource() {

assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
}
 
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testTableSourceWithLegacyProperties() {
+   // prepare parameters for Kafka table source
+   final TableSchema schema = TableSchema.builder()
+   .field(FRUIT_NAME, DataTypes.STRING())
+   .field(COUNT, DataTypes.DECIMAL(10, 3))
+   .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
+   .field(PROC_TIME, DataTypes.TIMESTAMP(3))
+   .build();
+
+   final List 
rowtimeAttributeDescriptors = Collections.singletonList(
+   new RowtimeAttributeDescriptor(EVENT_TIME, new 
ExistingField(TIME), new AscendingTimestamps()));
+
+   final Map fieldMapping = new HashMap<>();
+   fieldMapping.put(FRUIT_NAME, NAME);
+   fieldMapping.put(NAME, NAME);
+   fieldMapping.put(COUNT, COUNT);
+   fieldMapping.put(TIME, TIME);
+
+   final Map specificOffsets = new 
HashMap<>();
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
+
+   final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
+   TableSchema.builder()
+   .field(NAME, DataTypes.STRING())
+   .field(COUNT, 
DataTypes.DECIMAL(10, 3))
+   .field(TIME, 
DataTypes.TIMESTAMP(3))
+   .build()
+   .toRowType()
+   );
+
+   final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
+   schema,
+   Optional.of(PROC_TIME),
+   rowtimeAttributeDescriptors,
+   fieldMapping,
+   TOPIC,
+   KAFKA_PROPERTIES,
+   deserializationSchema,
+   StartupMode.SPECIFIC_OFFSETS,
+   specificOffsets);
+
+   TableSourceValidation.validateTableSource(expected);
+
+   // construct table source using descriptors and table source 
factory
+   final Map legacyPropertiesMap = new HashMap<>();
+   legacyPropertiesMap.putAll(createKafkaSourceProperties());
+
+   // use legacy properties
+   legacyPropertiesMap.remove("connector.specific-offsets");
+   
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
+   
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
+   legacyPropertiesMap.remove("connector.properties.group.id");
+
+   
legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0");
+   legacyPropertiesMap.put("connector.specific-offsets.0.offset", 
"100");
+   
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
+   legacyPropertiesMap.put("connector.specific-offsets.1.offset", 
"123");
+   legacyPropertiesMap.put("connector.properties.0.key", 
"zookeeper.connect");
+   legacyPropertiesMap.put("connector.properties.0.value", 
"dummy");
+   legacyPropertiesMap.put("connector.properties.1.key", 
"bootstrap.servers");
+   legacyPropertiesMap.put("connector.properties.1.value", 
"dummy");
+   legacyPropertiesMap.put("connector.properties.2.key", 
"group.id");
+   legacyPropertiesMap.put("connector.properties.2.value", 
"dummy");
+
+   final TableSource actualSource = 
TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
+   .createStreamTableSource(legacyPropertiesMap);
+
+   assertEquals(expected, actualSource);
+
+   // test Kafka consumer
+   final KafkaTableSourceBase actualKafkaSource = 
(KafkaTableSourceBase) actualSource;
+   final StreamExecutionEnvironmentMock mock = n

[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355159149
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (hostsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty.");
+   }
+
+   final String[] hosts = hostsStr.split(";");
+   for (String host : hosts) {
+   try {
+   final URL url = new URL(host);
+   final String protocol = url.getProtocol();
+   final String hostNmae = url.getHost();
+   final int hostPort = url.getPort();
+   hostList.add(new 
ElasticsearchUpsertTableSinkBase.Host(hostNmae, hostPort, protocol));
+   } catch (MalformedURLException e) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' format should follow the " +
+   "format 
'http://host_name:9092', but is:" + host , e);
 
 Review comment:
   ```suggestion
"format 
'http://host_name:port', but is '" + host + "'.", e);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355159119
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (hostsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty.");
+   }
 
 Review comment:
   Can be optimized into `descriptorProperties.validateString(CONNECTOR_HOSTS, 
false, 1);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355159387
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (hostsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty.");
+   }
+
+   final String[] hosts = hostsStr.split(";");
+   for (String host : hosts) {
+   try {
+   final URL url = new URL(host);
+   final String protocol = url.getProtocol();
+   final String hostNmae = url.getHost();
+   final int hostPort = url.getPort();
 
 Review comment:
   Please check `protocol` and `hostName` are not null, and `hostPort` is not 
`-1`. Because if the url string is invalid, the parsing still works, e.g. 
`http:abc` doesn't throw exception. 
   Please also add some unit tests to check this util method.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161152
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return SpecificOffsets with map format, key is partition, and value 
is offset.
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
+   final Map offsetMap = new HashMap<>();
+
+   final String parseSpecificOffsetsStr = 
descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS);
+   if (parseSpecificOffsetsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_SPECIFIC_OFFSETS + "' can not be empty.");
+   }
 
 Review comment:
   Can use `descriptorProperties.validateString(CONNECTOR_SPECIFIC_OFFSETS, 
false, 1);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355160813
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return SpecificOffsets with map format, key is partition, and value 
is offset.
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   `validateAndParseSpecificOffsetsString`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355158956
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
 
 Review comment:
   Remove the un-documented `@param`, `@return`, or document them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355159144
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (null == hostsStr || hostsStr.length() == 0) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr);
+   }
+
+   final String[] hosts = hostsStr.split(";");
 
 Review comment:
   The validation of hosts.size > 0 is still missing?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355160903
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 ##
 @@ -265,24 +259,22 @@ public Kafka sinkPartitionerCustom(Class partit
}
 
if (specificOffsets != null) {
-   final List> values = new ArrayList<>();
+   final StringBuilder stringBuilder = new StringBuilder();
+   int i = 0;
for (Map.Entry specificOffset : 
specificOffsets.entrySet()) {
-   
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
+   if (i != 0) {
+   stringBuilder.append(';');
+   }
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(',');
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue());
+   i++;
}
-   properties.putIndexedFixedProperties(
-   CONNECTOR_SPECIFIC_OFFSETS,
-   
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET),
-   values);
+   properties.putString(CONNECTOR_SPECIFIC_OFFSETS, 
stringBuilder.toString());
}
 
if (kafkaProperties != null) {
-   properties.putIndexedFixedProperties(
-   CONNECTOR_PROPERTIES,
-   Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE),
-   this.kafkaProperties.entrySet().stream()
-   .map(e -> Arrays.asList(e.getKey(), 
e.getValue()))
-   .collect(Collectors.toList())
-   );
+   this.kafkaProperties.entrySet().forEach(entry ->
+   
properties.putString(CONNECTOR_PROPERTIES + '.' + entry.getKey(), 
entry.getValue()));
 
 Review comment:
   simplify to 
   
   ```java
   this.kafkaProperties.forEach((key, value) ->
   properties.putString(CONNECTOR_PROPERTIES + '.' + key, value));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / 
api] Flatten all the connector properties keys to make it easy to configure in 
DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355158593
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
 ##
 @@ -200,15 +201,21 @@ private DescriptorProperties 
getValidatedProperties(Map properti
}
 
private List getHosts(DescriptorProperties descriptorProperties) {
-   final List> hosts = 
descriptorProperties.getFixedIndexedProperties(
-   CONNECTOR_HOSTS,
-   Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, 
CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
-   return hosts.stream()
-   .map(host -> new Host(
-   
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
-   
descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
-   
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL
-   .collect(Collectors.toList());
+   final List hostList = new ArrayList<>();
+   if (descriptorProperties.containsKey(CONNECTOR_HOSTS)) {
+   return validateAndGetHostsStr(descriptorProperties);
+   } else {
+   final List> hosts = 
descriptorProperties.getFixedIndexedProperties(
+   CONNECTOR_HOSTS,
+   Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, 
CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
+   hosts.stream()
+   .forEach(host -> hostList.add(new Host(
+   
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
+   
descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
+   
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL)))
+   ));
+   }
+   return hostList;
 
 Review comment:
   If you return in a `if` clause, then it would better to return in `else` 
clause too.
   
   suggestion:
   
   ```java
   if (descriptorProperties.containsKey(CONNECTOR_HOSTS)) {
return validateAndGetHostsStr(descriptorProperties);
} else {
final List> hosts = 
descriptorProperties.getFixedIndexedProperties(
CONNECTOR_HOSTS,
Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, 
CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
return hosts.stream()
.map(host -> new Host(

descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),

descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),

descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL
.collect(Collectors.toList());
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on issue #9689: [FLINK-7151] add a basic function ddl

2019-12-07 Thread GitBox
HuangZhenQiu commented on issue #9689: [FLINK-7151] add a basic function ddl
URL: https://github.com/apache/flink/pull/9689#issuecomment-562914670
 
 
   This PR is split into multiple smaller PRs. It is no longer needed. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu closed pull request #9689: [FLINK-7151] add a basic function ddl

2019-12-07 Thread GitBox
HuangZhenQiu closed pull request #9689: [FLINK-7151] add a basic function ddl
URL: https://github.com/apache/flink/pull/9689
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate

2019-12-07 Thread GitBox
HuangZhenQiu commented on issue #10371: [FLINK-14953][formats] use table type 
to build parquet FilterPredicate
URL: https://github.com/apache/flink/pull/10371#issuecomment-562914586
 
 
   @JingsongLi Thanks for the feedback. I changed accordingly. Please take one 
more round of look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate

2019-12-07 Thread GitBox
HuangZhenQiu commented on a change in pull request #10371: 
[FLINK-14953][formats] use table type to build parquet FilterPredicate
URL: https://github.com/apache/flink/pull/10371#discussion_r355161398
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
 ##
 @@ -200,6 +204,8 @@ private ParquetTableSource(String path, MessageType 
parquetSchema, Configuration
for (FilterPredicate converted : 
convertedPredicates.subList(1, convertedPredicates.size())) {
parquetPredicate = 
FilterApi.and(parquetPredicate, converted);
}
+   parquetPredicate = 
LogicalInverseRewriter.rewrite(parquetPredicate);
+   SchemaCompatibilityValidator.validate(parquetPredicate, 
parquetSchema);
 
 Review comment:
   It was added when i do local test. it is not needed. Reverted back.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…

2019-12-07 Thread GitBox
flinkbot commented on issue #10484: [FLINK-14906][table] create and drop temp 
system functions from DDL t…
URL: https://github.com/apache/flink/pull/10484#issuecomment-562914317
 
 
   
   ## CI report:
   
   * c604c9f8704578c33884b2faf8d163a7482795c4 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add 
Operator Coordinators and Events
URL: https://github.com/apache/flink/pull/10483#issuecomment-562912876
 
 
   
   ## CI report:
   
   * f7d3793fc60c8700f99adf52ed760e9665f42a90 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139976585)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate

2019-12-07 Thread GitBox
HuangZhenQiu commented on a change in pull request #10371: 
[FLINK-14953][formats] use table type to build parquet FilterPredicate
URL: https://github.com/apache/flink/pull/10371#discussion_r355160821
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
 ##
 @@ -447,8 +453,16 @@ private String getColumnName(BinaryComparison comp) {
 
@Nullable
private Tuple2 
extractColumnAndLiteral(BinaryComparison comp) {
-   TypeInformation typeInfo = getLiteralType(comp);
String columnName = getColumnName(comp);
+   TypeInformation typeInfo = null;
+   try {
+   String[] columnPath = columnName.split("\\.");
+   Type type = parquetSchema.getType(columnPath);
+   typeInfo = 
ParquetSchemaConverter.convertParquetTypeToTypeInfo(type);
+   } catch (InvalidRecordException e) {
 
 Review comment:
   It is not needed if users use sql. But as the applyPredicate function is 
public, it is possible for user to specify an undefined field in expression. In 
ParquetTableSourceTest, also cover the scenarios. I catch the exception here to 
make sure original test case can pass.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10460: [FLINK-14574] Fixing writing with plugin s3 filesystem.

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10460: [FLINK-14574] Fixing writing with 
plugin s3 filesystem. 
URL: https://github.com/apache/flink/pull/10460#issuecomment-562508393
 
 
   
   ## CI report:
   
   * 61e387e9665125032c92ed23afe138c4973b7eeb : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139672737)
   * 1696c5aaacdc526c8b615bd1e68ae42af23599a8 : UNKNOWN
   * 10ffc91acaba9964019f0301c0b0a1ecd7797732 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139677707)
   * 09f02dc709e92105087ee74572618734cedf2f92 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139719249)
   * e83f45cda365e2b668c99fadd7a055ca8048b9e9 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139859320)
   * 8fcd386c8ae470dfcc9a3e4a0c2ff971e88156e2 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139860794)
   * 2da3b996050b105b54f53f4f6453431c779da9bc : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139871817)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161099
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 ##
 @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit
}
 
if (specificOffsets != null) {
-   final List> values = new ArrayList<>();
+   final StringBuilder stringBuilder = new StringBuilder();
+   int i = 0;
for (Map.Entry specificOffset : 
specificOffsets.entrySet()) {
-   
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
+   if (i != 0) {
+   stringBuilder.append(';');
+   }
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(',');
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue());
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161074
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -247,10 +247,8 @@ tables:
   topic: test-input
   startup-mode: earliest-offset
   properties:
-- key: zookeeper.connect
-  value: localhost:2181
-- key: bootstrap.servers
-  value: localhost:9092
+zookeeper.connect: localhost:2181,localhost:2182
 
 Review comment:
   Just to specify multi hosts format in a string here,
maybe 'hostname1:2181,host_name2:2181' will better, how do you think of ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate

2019-12-07 Thread GitBox
HuangZhenQiu commented on a change in pull request #10371: 
[FLINK-14953][formats] use table type to build parquet FilterPredicate
URL: https://github.com/apache/flink/pull/10371#discussion_r355160831
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
 ##
 @@ -465,6 +479,10 @@ private String getColumnName(BinaryComparison comp) {
typeInfo == BasicTypeInfo.INT_TYPE_INFO) {
return new Tuple2<>(FilterApi.intColumn(columnName), 
(Integer) value);
} else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+   if (value instanceof Integer) {
+   Integer intValue = (Integer) value;
+   return new 
Tuple2<>(FilterApi.longColumn(columnName), intValue.longValue());
+   }
return new Tuple2<>(FilterApi.longColumn(columnName), 
(Long) value);
 
 Review comment:
   Agree. It is more readable. Changed accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate

2019-12-07 Thread GitBox
HuangZhenQiu commented on a change in pull request #10371: 
[FLINK-14953][formats] use table type to build parquet FilterPredicate
URL: https://github.com/apache/flink/pull/10371#discussion_r355160821
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
 ##
 @@ -447,8 +453,16 @@ private String getColumnName(BinaryComparison comp) {
 
@Nullable
private Tuple2 
extractColumnAndLiteral(BinaryComparison comp) {
-   TypeInformation typeInfo = getLiteralType(comp);
String columnName = getColumnName(comp);
+   TypeInformation typeInfo = null;
+   try {
+   String[] columnPath = columnName.split("\\.");
+   Type type = parquetSchema.getType(columnPath);
+   typeInfo = 
ParquetSchemaConverter.convertParquetTypeToTypeInfo(type);
+   } catch (InvalidRecordException e) {
 
 Review comment:
   It is not needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate

2019-12-07 Thread GitBox
HuangZhenQiu commented on a change in pull request #10371: 
[FLINK-14953][formats] use table type to build parquet FilterPredicate
URL: https://github.com/apache/flink/pull/10371#discussion_r355160800
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
 ##
 @@ -447,8 +453,16 @@ private String getColumnName(BinaryComparison comp) {
 
@Nullable
private Tuple2 
extractColumnAndLiteral(BinaryComparison comp) {
-   TypeInformation typeInfo = getLiteralType(comp);
String columnName = getColumnName(comp);
+   TypeInformation typeInfo = null;
+   try {
+   String[] columnPath = columnName.split("\\.");
 
 Review comment:
   Thanks for the suggestion. Changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong closed pull request #10213: [FLINK-12846][table-common] Carry primary key information in TableSchema

2019-12-07 Thread GitBox
wuchong closed pull request #10213: [FLINK-12846][table-common] Carry primary 
key information in TableSchema
URL: https://github.com/apache/flink/pull/10213
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #10245: [FLINK-10936][kubernetes] Implement Kubernetes command line tools to support session cluster.

2019-12-07 Thread GitBox
zhuzhurk commented on a change in pull request #10245: 
[FLINK-10936][kubernetes] Implement Kubernetes command line tools to support 
session cluster.
URL: https://github.com/apache/flink/pull/10245#discussion_r355160547
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
 ##
 @@ -42,6 +47,8 @@
  */
 public abstract class AbstractCustomCommandLine implements CustomCommandLine {
 
+   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCustomCommandLine.class);
 
 Review comment:
   Maybe we can make this logger `protected final Logger LOG = 
LoggerFactory.getLogger(getClass());` for all its sub classes to use, so that 
we do not need to create multiple loggers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #10245: [FLINK-10936][kubernetes] Implement Kubernetes command line tools to support session cluster.

2019-12-07 Thread GitBox
zhuzhurk commented on a change in pull request #10245: 
[FLINK-10936][kubernetes] Implement Kubernetes command line tools to support 
session cluster.
URL: https://github.com/apache/flink/pull/10245#discussion_r355160571
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
 ##
 @@ -89,4 +96,80 @@ public Configuration 
applyCommandLineOptionsToConfiguration(CommandLine commandL
 
return resultingConfiguration;
}
+
+   protected void printUsage() {
+   System.out.println("Usage:");
+   HelpFormatter formatter = new HelpFormatter();
+   formatter.setWidth(200);
+   formatter.setLeftPadding(5);
+
+   formatter.setSyntaxPrefix("   Optional");
+   Options options = new Options();
+   addGeneralOptions(options);
+   addRunOptions(options);
+   formatter.printHelp(" ", options);
+   }
+
+   protected static int handleCliArgsException(CliArgsException e) {
+   LOG.error("Could not parse the command line arguments.", e);
+
+   System.out.println(e.getMessage());
+   System.out.println();
+   System.out.println("Use the help option (-h or --help) to get 
help on the command.");
+   return 1;
+   }
+
+   protected static int handleError(Throwable t) {
+   LOG.error("Error while running the Flink session.", t);
+
+   System.err.println();
+   
System.err.println("");
+   System.err.println(" The program finished with the following 
exception:");
+   System.err.println();
+
+   t.printStackTrace();
+   return 1;
+   }
+
+   /**
+* Read-Evaluate-Print step for the REPL.
+*
+* @param in to read from
+* @param readConsoleInput true if console input has to be read
+* @param clientPollingIntervalMs wait until clientPollingIntervalMs is 
over or the user entered something.
+* @param helpMessage help message
+* @return true if the REPL shall be continued, otherwise false
+*/
+   protected static boolean repStep(
+   BufferedReader in,
+   boolean readConsoleInput,
+   long clientPollingIntervalMs,
+   String helpMessage) throws IOException, InterruptedException {
+
+   long startTime = System.currentTimeMillis();
+   while ((System.currentTimeMillis() - startTime) < 
clientPollingIntervalMs
 
 Review comment:
   why not also making `CLIENT_POLLING_INTERVAL_MS` common for all clients?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-15096) Do not use GlobalJobParameters to pass system configuration

2019-12-07 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-15096.
---
Resolution: Fixed

> Do not use GlobalJobParameters to pass system configuration
> ---
>
> Key: FLINK-15096
> URL: https://issues.apache.org/jira/browse/FLINK-15096
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: Dawid Wysakowicz
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GlobalJobParameters is a user only configuration that should not be used to 
> ship system specific settings.
> Right now python uses it to ship information about custom archives, files, 
> executables etc.
> A solution would be to pass required configuration when instantiating the 
> operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15096) Do not use GlobalJobParameters to pass system configuration

2019-12-07 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990721#comment-16990721
 ] 

Hequn Cheng commented on FLINK-15096:
-

Resolved in 1.10.0 via 0d7c15703d0dd304d49203c163e9a1397e4e0d9e

> Do not use GlobalJobParameters to pass system configuration
> ---
>
> Key: FLINK-15096
> URL: https://issues.apache.org/jira/browse/FLINK-15096
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: Dawid Wysakowicz
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GlobalJobParameters is a user only configuration that should not be used to 
> ship system specific settings.
> Right now python uses it to ship information about custom archives, files, 
> executables etc.
> A solution would be to pass required configuration when instantiating the 
> operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 closed pull request #10477: [FLINK-15096][python] Pass python configuration to the python operators via table config instead of global job parameters.

2019-12-07 Thread GitBox
hequn8128 closed pull request #10477: [FLINK-15096][python] Pass python 
configuration to the python operators via table config instead of global job 
parameters.
URL: https://github.com/apache/flink/pull/10477
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events

2019-12-07 Thread GitBox
flinkbot commented on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add 
Operator Coordinators and Events
URL: https://github.com/apache/flink/pull/10483#issuecomment-562912876
 
 
   
   ## CI report:
   
   * f7d3793fc60c8700f99adf52ed760e9665f42a90 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10462: [FLINK-15031][runtime] Calculate required shuffle memory before allocating slots if resources are specified

2019-12-07 Thread GitBox
flinkbot edited a comment on issue #10462: [FLINK-15031][runtime] Calculate 
required shuffle memory before allocating slots if resources are specified
URL: https://github.com/apache/flink/pull/10462#issuecomment-562542464
 
 
   
   ## CI report:
   
   * 708583f6acbff12da90bc28269d924433c446e32 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139686969)
   * 2f4aee047ff40997c3b940fd1b9e7c7145ed9903 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139869261)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355157648
 
 

 ##
 File path: tools/jenkins/run_case.py
 ##
 @@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file will be runned by jenkis to run the e2e perf test
+# Params:
+# am_seserver_dddress: master machines's ip  of standalone environment
+# scenario_file: the file which contains test scenarios
+# flink_home: the path of flink
+# inter_nums:the num of every scenario's running, default value is 10
+# wait_minute: interval time of two elections of qps,default value is 10s
+#
+
+
+import sys
+import time
+
+if sys.version_info < (3, 5):
+print("Python versions prior to 3.5 are not supported.")
+sys.exit(-1)
+
+from logger import logger
+from utils import run_command
+from restapi_common import get_avg_qps_by_restful_interface
+
+
+def start_server(flink_home):
+cmd = "bash %s/bin/start-cluster.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def end_server(flink_home):
+cmd = "bash %s/bin/stop_yarn.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def get_scenarios(scenario_file_name, test_jar):
+"""
+parser file which contains serval scenarios,it's content likes this:
+classPath   scenarioName  jobparam1 jobparams2
+org.apache.Test testScenario1 aaa   bbb
+……
+:param scenario_file_name: scenario's file
+:param test_jar:
+:return: list of scenarios
+"""
+params_name = []
+scenarios = []
+scenario_names = []
+linenum = 0
+with open(scenario_file_name) as file:
+data = file.readline()
+if not (data.startswith("#") or data == ""):
+linenum = linenum + 1
+cmd = ""
+scenario_name = ""
+if linenum == 1:
+params_name = data.split(" ")
+for index in range(0, len(params_name)):
+params_name[index] = params_name[index]
+if not "testClassPath" in params_name:
+return 1, []
+else:
+params_value = data.split(" ")
+for index in range(0, len(params_name)):
+param = params_name[index]
+if param == "testClassPath":
+cmd = "-c %s %s %s" % (params_value[index], test_jar,  
cmd)
+else:
+if param == "":
+cmd = "--%s %s" % (param, params_value[index])
+else:
+cmd = "%s --%s %s" % (cmd, param, 
params_value[index])
+scenario_name = "%s_%s" % (scenario_name, param)
+scenario_names.append(scenario_name[1:])
+scenarios.append(cmd)
+return 0, scenarios, scenario_names
+
+
+def get_avg(values):
+if len(values) == 0:
+return 0.0
+else:
+return sum(values) * 1.0 / len(values)
+
+
+def run_cases(scenario_file_name, flink_home, am_seserver_dddress, 
inter_nums=10, wait_minute=10):
+status, scenarios, scenario_names = get_scenarios(scenario_file_name)
+for scenario_index in range(0, len(scenarios)):
+scenario = scenarios.get(scenario_index)
+scenario_name = scenario_names[scenario_index]
+total_qps = []
+status = start_server(flink_home)
+if not status:
+logger.info("start server failed")
+return 1
+for inter_index in range(0, inter_nums):
+cmd = "bash %s/bin/flink run %s" % (flink_home, scenario)
+status, output = run_command(cmd)
+if status:
+

[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355156657
 
 

 ##
 File path: tools/jenkins/restapi_common.py
 ##
 @@ -0,0 +1,85 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This common file writes the functions of getting qps from flink restful api
+#
+# Note by AiHua Li:
 
 Review comment:
   Can we remove this line? It seems not a common way in flink to leave an 
author name in the file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355155525
 
 

 ##
 File path: tools/jenkins/init_env.py
 ##
 @@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Because of end-to-end-performance-test running in the cluster, so This 
init_env.py only contains flink on
+# standalone env, doesn't contains the init environment of maven, java and 
jenkins. Defaultly maven, java and jenkins
+# environment are ready in the cluster
+#
+
+import os
+from logger import logger
+from utils import run_command
+from restapi_common import execute_get
+
+
+def init_standalone_env(host_list, user, source_path, dest_path):
+for host in host_list:
+cmd = "sudo su %s -c 'ssh %s \" rm -rf %s\"; scp -r %s %s@%s:%s'" % 
(user, host, dest_path, source_path,
+ 
user, host, dest_path)
+logger.info("init_standalone_env  cmd:%s" % cmd)
+run_command(cmd)
+
+
+def get_host_list(slave_file):
+hostlist = []
+with open(slave_file) as file:
+data = file.readline()
+if not(data == "" or data.startswith("#")):
+hostlist.append(data)
+return hostlist
+
+
+def package(flink_home):
+cmd = "cd %s; mvn clean install -B -U -DskipTests -Drat.skip=true 
-Dcheckstyle.skip=true " % flink_home
+status, output = run_command(cmd)
+if status and output.find("BUILD SUCCESS") > 0:
+return True
+else:
+return False
+
+
+def get_target(flink_home):
+cmd = "ls -lt %s/flink-dist/target/flink-*-bin/ |grep -v tar.gz"
+status, output = run_command(cmd)
+if status:
+target_file = output.split("\n")[0]
+return target_file, "%s/flink-dist/target/%s-bin/%s" % (flink_home, 
target_file, target_file)
+else:
+return "", ""
+
+
+def update_conf_slaves(dest_path, slave_file):
+cmd = "cp %s %s/conf/" % (slave_file, dest_path)
+run_command(cmd)
+
+
+def init_env():
+flink_home = os.getcwd()
 
 Review comment:
   The returned value may not be a path of flink home. How about get the path 
of current file and then navigate to the flink home?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355157825
 
 

 ##
 File path: tools/jenkins/run_case.py
 ##
 @@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file will be runned by jenkis to run the e2e perf test
+# Params:
+# am_seserver_dddress: master machines's ip  of standalone environment
+# scenario_file: the file which contains test scenarios
+# flink_home: the path of flink
+# inter_nums:the num of every scenario's running, default value is 10
+# wait_minute: interval time of two elections of qps,default value is 10s
+#
+
+
+import sys
+import time
+
+if sys.version_info < (3, 5):
+print("Python versions prior to 3.5 are not supported.")
+sys.exit(-1)
+
+from logger import logger
+from utils import run_command
+from restapi_common import get_avg_qps_by_restful_interface
+
+
+def start_server(flink_home):
+cmd = "bash %s/bin/start-cluster.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def end_server(flink_home):
+cmd = "bash %s/bin/stop_yarn.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def get_scenarios(scenario_file_name, test_jar):
+"""
+parser file which contains serval scenarios,it's content likes this:
 
 Review comment:
   Parse file which contains several scenarios. Its content looks like the 
following examples.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355155587
 
 

 ##
 File path: tools/jenkins/init_env.py
 ##
 @@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Because of end-to-end-performance-test running in the cluster, so This 
init_env.py only contains flink on
+# standalone env, doesn't contains the init environment of maven, java and 
jenkins. Defaultly maven, java and jenkins
+# environment are ready in the cluster
+#
+
+import os
+from logger import logger
+from utils import run_command
+from restapi_common import execute_get
+
+
+def init_standalone_env(host_list, user, source_path, dest_path):
+for host in host_list:
+cmd = "sudo su %s -c 'ssh %s \" rm -rf %s\"; scp -r %s %s@%s:%s'" % 
(user, host, dest_path, source_path,
+ 
user, host, dest_path)
+logger.info("init_standalone_env  cmd:%s" % cmd)
+run_command(cmd)
+
+
+def get_host_list(slave_file):
+hostlist = []
+with open(slave_file) as file:
+data = file.readline()
+if not(data == "" or data.startswith("#")):
+hostlist.append(data)
+return hostlist
+
+
+def package(flink_home):
+cmd = "cd %s; mvn clean install -B -U -DskipTests -Drat.skip=true 
-Dcheckstyle.skip=true " % flink_home
+status, output = run_command(cmd)
+if status and output.find("BUILD SUCCESS") > 0:
+return True
+else:
+return False
+
+
+def get_target(flink_home):
+cmd = "ls -lt %s/flink-dist/target/flink-*-bin/ |grep -v tar.gz"
+status, output = run_command(cmd)
+if status:
+target_file = output.split("\n")[0]
+return target_file, "%s/flink-dist/target/%s-bin/%s" % (flink_home, 
target_file, target_file)
+else:
+return "", ""
+
+
+def update_conf_slaves(dest_path, slave_file):
+cmd = "cp %s %s/conf/" % (slave_file, dest_path)
+run_command(cmd)
+
+
+def init_env():
+flink_home = os.getcwd()
+package_result = package(flink_home)
+if not package_result:
+logger.error("package error")
+return False
+slave_file = "%s/tool/jenkins/slaves" % flink_home
 
 Review comment:
   It seems the script would fail as there are no `tool/jenkins/slaves` in the 
current flink home folder?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355157525
 
 

 ##
 File path: tools/jenkins/restapi_common.py
 ##
 @@ -0,0 +1,85 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This common file writes the functions of getting qps from flink restful api
+#
+# Note by AiHua Li:
+
+import urllib
+import urllib.request
+import json
+from logger import logger
+
+
+def execute_get(url):
+"""
+Send a get request
+:param url:
+:return: content from the request
+"""
+try:
+result = urllib.request.urlopen(url, timeout=300).read()
+if result != "":
+return json.loads(result)
+else:
+return {}
+except Exception as e:
+logger.error(e)
+return {}
+
+
+def get_source_node(plan):
+"""
+get the source nodes from job's json plan
+:param plan:
+:return: list of source nodes
+"""
+source_nodes = []
+nodes = plan.get("nodes", [])
+for node in nodes:
+inputs = node.get("inputs", [])
+if len(inputs) == 0:
+source_nodes.append(node["id"])
+return source_nodes
+
+
+def get_avg_qps_by_restful_interface(am_seserver_dddress, job_id):
+url = "http://%s/jobs/%s"; % (am_seserver_dddress, job_id)
+result = execute_get(url)
+vertices = result.get("vertices", "")
+plan = result.get("plan", "")
+source_nodes = get_source_node(plan)
+totaltps = 0
+for vertice in vertices:
+id = vertice.get("id", "")
+if id in source_nodes:
+url = "http://%s/jobs/%s/vertices/%s/subtasks/metrics?agg=avg"; % 
(am_seserver_dddress, job_id, id)
+keyresult = execute_get(url)
+for key in keyresult:
+metrics_name = key.get("id", "")
+if metrics_name.endswith("numBuffersOutPerSecond"):
 
 Review comment:
   Not sure whether should we use numBuffersOutPerSecond here. The buffer size 
would be changed? Should we use numRecordsOutPerSecond here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355156159
 
 

 ##
 File path: tools/jenkins/init_env.py
 ##
 @@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Because of end-to-end-performance-test running in the cluster, so This 
init_env.py only contains flink on
+# standalone env, doesn't contains the init environment of maven, java and 
jenkins. Defaultly maven, java and jenkins
+# environment are ready in the cluster
+#
+
+import os
+from logger import logger
+from utils import run_command
+from restapi_common import execute_get
 
 Review comment:
   Unused import


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355157613
 
 

 ##
 File path: tools/jenkins/run_case.py
 ##
 @@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file will be runned by jenkis to run the e2e perf test
+# Params:
+# am_seserver_dddress: master machines's ip  of standalone environment
+# scenario_file: the file which contains test scenarios
+# flink_home: the path of flink
+# inter_nums:the num of every scenario's running, default value is 10
+# wait_minute: interval time of two elections of qps,default value is 10s
+#
+
+
+import sys
+import time
+
+if sys.version_info < (3, 5):
+print("Python versions prior to 3.5 are not supported.")
+sys.exit(-1)
+
+from logger import logger
+from utils import run_command
+from restapi_common import get_avg_qps_by_restful_interface
+
+
+def start_server(flink_home):
+cmd = "bash %s/bin/start-cluster.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def end_server(flink_home):
+cmd = "bash %s/bin/stop_yarn.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def get_scenarios(scenario_file_name, test_jar):
+"""
+parser file which contains serval scenarios,it's content likes this:
+classPath   scenarioName  jobparam1 jobparams2
+org.apache.Test testScenario1 aaa   bbb
+……
+:param scenario_file_name: scenario's file
+:param test_jar:
+:return: list of scenarios
+"""
+params_name = []
+scenarios = []
+scenario_names = []
+linenum = 0
+with open(scenario_file_name) as file:
+data = file.readline()
+if not (data.startswith("#") or data == ""):
+linenum = linenum + 1
+cmd = ""
+scenario_name = ""
+if linenum == 1:
+params_name = data.split(" ")
+for index in range(0, len(params_name)):
+params_name[index] = params_name[index]
+if not "testClassPath" in params_name:
+return 1, []
+else:
+params_value = data.split(" ")
+for index in range(0, len(params_name)):
+param = params_name[index]
+if param == "testClassPath":
+cmd = "-c %s %s %s" % (params_value[index], test_jar,  
cmd)
+else:
+if param == "":
+cmd = "--%s %s" % (param, params_value[index])
+else:
+cmd = "%s --%s %s" % (cmd, param, 
params_value[index])
+scenario_name = "%s_%s" % (scenario_name, param)
+scenario_names.append(scenario_name[1:])
+scenarios.append(cmd)
+return 0, scenarios, scenario_names
+
+
+def get_avg(values):
+if len(values) == 0:
+return 0.0
+else:
+return sum(values) * 1.0 / len(values)
+
+
+def run_cases(scenario_file_name, flink_home, am_seserver_dddress, 
inter_nums=10, wait_minute=10):
+status, scenarios, scenario_names = get_scenarios(scenario_file_name)
+for scenario_index in range(0, len(scenarios)):
+scenario = scenarios.get(scenario_index)
+scenario_name = scenario_names[scenario_index]
+total_qps = []
+status = start_server(flink_home)
+if not status:
+logger.info("start server failed")
+return 1
+for inter_index in range(0, inter_nums):
+cmd = "bash %s/bin/flink run %s" % (flink_home, scenario)
+status, output = run_command(cmd)
+if status:
+

[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355156233
 
 

 ##
 File path: tools/jenkins/init_env.py
 ##
 @@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Because of end-to-end-performance-test running in the cluster, so This 
init_env.py only contains flink on
+# standalone env, doesn't contains the init environment of maven, java and 
jenkins. Defaultly maven, java and jenkins
+# environment are ready in the cluster
+#
+
+import os
+from logger import logger
+from utils import run_command
+from restapi_common import execute_get
+
+
+def init_standalone_env(host_list, user, source_path, dest_path):
+for host in host_list:
+cmd = "sudo su %s -c 'ssh %s \" rm -rf %s\"; scp -r %s %s@%s:%s'" % 
(user, host, dest_path, source_path,
+ 
user, host, dest_path)
+logger.info("init_standalone_env  cmd:%s" % cmd)
+run_command(cmd)
+
+
+def get_host_list(slave_file):
+hostlist = []
+with open(slave_file) as file:
+data = file.readline()
+if not(data == "" or data.startswith("#")):
+hostlist.append(data)
+return hostlist
+
+
+def package(flink_home):
+cmd = "cd %s; mvn clean install -B -U -DskipTests -Drat.skip=true 
-Dcheckstyle.skip=true " % flink_home
+status, output = run_command(cmd)
+if status and output.find("BUILD SUCCESS") > 0:
+return True
+else:
+return False
+
+
+def get_target(flink_home):
+cmd = "ls -lt %s/flink-dist/target/flink-*-bin/ |grep -v tar.gz"
+status, output = run_command(cmd)
+if status:
+target_file = output.split("\n")[0]
+return target_file, "%s/flink-dist/target/%s-bin/%s" % (flink_home, 
target_file, target_file)
+else:
+return "", ""
+
+
+def update_conf_slaves(dest_path, slave_file):
+cmd = "cp %s %s/conf/" % (slave_file, dest_path)
+run_command(cmd)
+
+
+def init_env():
+flink_home = os.getcwd()
+package_result = package(flink_home)
+if not package_result:
+logger.error("package error")
+return False
+slave_file = "%s/tool/jenkins/slaves" % flink_home
+host_list = get_host_list(slave_file)
+flink_path, source_path = get_target(flink_home)
+dest_path = "/home/admin/%s" % flink_path
 
 Review comment:
   Maybe we should not hard code the path. All paths should base on a relative 
path according to the flink_home, so that we can run this script everywhere by 
everyone.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355156382
 
 

 ##
 File path: tools/jenkins/init_env.py
 ##
 @@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Because of end-to-end-performance-test running in the cluster, so This 
init_env.py only contains flink on
+# standalone env, doesn't contains the init environment of maven, java and 
jenkins. Defaultly maven, java and jenkins
+# environment are ready in the cluster
+#
+
+import os
+from logger import logger
+from utils import run_command
+from restapi_common import execute_get
+
+
+def init_standalone_env(host_list, user, source_path, dest_path):
+for host in host_list:
+cmd = "sudo su %s -c 'ssh %s \" rm -rf %s\"; scp -r %s %s@%s:%s'" % 
(user, host, dest_path, source_path,
+ 
user, host, dest_path)
+logger.info("init_standalone_env  cmd:%s" % cmd)
+run_command(cmd)
+
+
+def get_host_list(slave_file):
+hostlist = []
+with open(slave_file) as file:
+data = file.readline()
+if not(data == "" or data.startswith("#")):
+hostlist.append(data)
+return hostlist
+
+
+def package(flink_home):
+cmd = "cd %s; mvn clean install -B -U -DskipTests -Drat.skip=true 
-Dcheckstyle.skip=true " % flink_home
+status, output = run_command(cmd)
+if status and output.find("BUILD SUCCESS") > 0:
+return True
+else:
+return False
+
+
+def get_target(flink_home):
+cmd = "ls -lt %s/flink-dist/target/flink-*-bin/ |grep -v tar.gz"
+status, output = run_command(cmd)
+if status:
+target_file = output.split("\n")[0]
+return target_file, "%s/flink-dist/target/%s-bin/%s" % (flink_home, 
target_file, target_file)
+else:
+return "", ""
+
+
+def update_conf_slaves(dest_path, slave_file):
+cmd = "cp %s %s/conf/" % (slave_file, dest_path)
+run_command(cmd)
+
+
+def init_env():
+flink_home = os.getcwd()
+package_result = package(flink_home)
+if not package_result:
+logger.error("package error")
+return False
+slave_file = "%s/tool/jenkins/slaves" % flink_home
+host_list = get_host_list(slave_file)
+flink_path, source_path = get_target(flink_home)
+dest_path = "/home/admin/%s" % flink_path
+if source_path != "":
+update_conf_slaves(source_path, slave_file)
+init_standalone_env(host_list, source_path, dest_path)
 
 Review comment:
   Not working here. The method contains 4 parameters while we pass 3 here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355158259
 
 

 ##
 File path: tools/jenkins/run_case.py
 ##
 @@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file will be runned by jenkis to run the e2e perf test
+# Params:
+# am_seserver_dddress: master machines's ip  of standalone environment
+# scenario_file: the file which contains test scenarios
+# flink_home: the path of flink
+# inter_nums:the num of every scenario's running, default value is 10
+# wait_minute: interval time of two elections of qps,default value is 10s
+#
+
+
+import sys
+import time
+
+if sys.version_info < (3, 5):
+print("Python versions prior to 3.5 are not supported.")
+sys.exit(-1)
+
+from logger import logger
+from utils import run_command
+from restapi_common import get_avg_qps_by_restful_interface
+
+
+def start_server(flink_home):
+cmd = "bash %s/bin/start-cluster.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def end_server(flink_home):
+cmd = "bash %s/bin/stop_yarn.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def get_scenarios(scenario_file_name, test_jar):
+"""
+parser file which contains serval scenarios,it's content likes this:
+classPath   scenarioName  jobparam1 jobparams2
+org.apache.Test testScenario1 aaa   bbb
+……
+:param scenario_file_name: scenario's file
+:param test_jar:
+:return: list of scenarios
+"""
+params_name = []
+scenarios = []
+scenario_names = []
+linenum = 0
+with open(scenario_file_name) as file:
+data = file.readline()
+if not (data.startswith("#") or data == ""):
+linenum = linenum + 1
+cmd = ""
+scenario_name = ""
+if linenum == 1:
+params_name = data.split(" ")
+for index in range(0, len(params_name)):
+params_name[index] = params_name[index]
+if not "testClassPath" in params_name:
+return 1, []
+else:
+params_value = data.split(" ")
+for index in range(0, len(params_name)):
+param = params_name[index]
+if param == "testClassPath":
+cmd = "-c %s %s %s" % (params_value[index], test_jar,  
cmd)
+else:
+if param == "":
+cmd = "--%s %s" % (param, params_value[index])
+else:
+cmd = "%s --%s %s" % (cmd, param, 
params_value[index])
+scenario_name = "%s_%s" % (scenario_name, param)
+scenario_names.append(scenario_name[1:])
+scenarios.append(cmd)
+return 0, scenarios, scenario_names
+
+
+def get_avg(values):
+if len(values) == 0:
+return 0.0
+else:
+return sum(values) * 1.0 / len(values)
+
+
+def run_cases(scenario_file_name, flink_home, am_seserver_dddress, 
inter_nums=10, wait_minute=10):
+status, scenarios, scenario_names = get_scenarios(scenario_file_name)
+for scenario_index in range(0, len(scenarios)):
+scenario = scenarios.get(scenario_index)
+scenario_name = scenario_names[scenario_index]
+total_qps = []
+status = start_server(flink_home)
 
 Review comment:
   Why start a cluster for each scenario? Can we start the cluster and reuse it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL abov

[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests

2019-12-07 Thread GitBox
hequn8128 commented on a change in pull request #10436: [FLINK-14920] 
[flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
URL: https://github.com/apache/flink/pull/10436#discussion_r355158354
 
 

 ##
 File path: tools/jenkins/run_case.py
 ##
 @@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file will be runned by jenkis to run the e2e perf test
+# Params:
+# am_seserver_dddress: master machines's ip  of standalone environment
+# scenario_file: the file which contains test scenarios
+# flink_home: the path of flink
+# inter_nums:the num of every scenario's running, default value is 10
+# wait_minute: interval time of two elections of qps,default value is 10s
+#
+
+
+import sys
+import time
+
+if sys.version_info < (3, 5):
+print("Python versions prior to 3.5 are not supported.")
+sys.exit(-1)
+
+from logger import logger
+from utils import run_command
+from restapi_common import get_avg_qps_by_restful_interface
+
+
+def start_server(flink_home):
+cmd = "bash %s/bin/start-cluster.sh" % flink_home
+status, output = run_command(cmd)
+if status and output.find("Exception") < 0:
+return True
+else:
+return False
+
+
+def end_server(flink_home):
 
 Review comment:
   This method has never been called. We should shut down the cluster correctly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu closed pull request #10476: [FLINK-14911] [table] register and drop temp catalog functions from D…

2019-12-07 Thread GitBox
HuangZhenQiu closed pull request #10476: [FLINK-14911] [table] register and 
drop temp catalog functions from D…
URL: https://github.com/apache/flink/pull/10476
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   >