[GitHub] [flink] HuangZhenQiu commented on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
HuangZhenQiu commented on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#issuecomment-562922102 @bowenli86 Thanks for the feedback. I agree with your suggestion. We should distinguish system or catalog function from the very beginning. Thus, different types of operations should be created for system and catalog functions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
HuangZhenQiu commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#discussion_r355167003 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java ## @@ -33,16 +33,22 @@ private final String className; // Fully qualified class name of the function private final FunctionLanguage functionLanguage; private final boolean isTemporary; + private final boolean isSystem; Review comment: Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory
flinkbot edited a comment on issue #10329: [FLINK-12785][StateBackend] RocksDB savepoint recovery can use a lot of unmanaged memory URL: https://github.com/apache/flink/pull/10329#issuecomment-558970068 ## CI report: * e3aa8e5a567a13f07598ee6fe0e8fb8e72dfb986 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138360657) * 86fcd30935fbbcdbd4b976c0fc3f17194876b4de : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139353188) * f1dda3798762bb41163d3fde9a45ce1b5eb14b0d : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139470678) * ebf6b89063a2acd1ff9765b76088c41aead0a5d9 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139719201) * 19e4b4c864f95e2b355a09742af684c453daa223 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139844119) * e2941014c7fcf0fcb2e0e89361328370641d1d50 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139940394) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Li-Aihua commented on issue #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
Li-Aihua commented on issue #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#issuecomment-562920893 Thanks @hequn8128 for your comments. I fixed all places in your comment.Now , this e2e perf test is running in the cluster, i will add the result later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Li-Aihua commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
Li-Aihua commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355165876 ## File path: tools/jenkins/run_case.py ## @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file will be runned by jenkis to run the e2e perf test +# Params: +# am_seserver_dddress: master machines's ip of standalone environment +# scenario_file: the file which contains test scenarios +# flink_home: the path of flink +# inter_nums:the num of every scenario's running, default value is 10 +# wait_minute: interval time of two elections of qps,default value is 10s +# + + +import sys +import time + +if sys.version_info < (3, 5): +print("Python versions prior to 3.5 are not supported.") +sys.exit(-1) + +from logger import logger +from utils import run_command +from restapi_common import get_avg_qps_by_restful_interface + + +def start_server(flink_home): +cmd = "bash %s/bin/start-cluster.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def end_server(flink_home): +cmd = "bash %s/bin/stop_yarn.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def get_scenarios(scenario_file_name, test_jar): +""" +parser file which contains serval scenarios,it's content likes this: +classPath scenarioName jobparam1 jobparams2 +org.apache.Test testScenario1 aaa bbb +…… +:param scenario_file_name: scenario's file +:param test_jar: +:return: list of scenarios +""" +params_name = [] +scenarios = [] +scenario_names = [] +linenum = 0 +with open(scenario_file_name) as file: +data = file.readline() +if not (data.startswith("#") or data == ""): +linenum = linenum + 1 +cmd = "" +scenario_name = "" +if linenum == 1: +params_name = data.split(" ") +for index in range(0, len(params_name)): +params_name[index] = params_name[index] +if not "testClassPath" in params_name: +return 1, [] +else: +params_value = data.split(" ") +for index in range(0, len(params_name)): +param = params_name[index] +if param == "testClassPath": +cmd = "-c %s %s %s" % (params_value[index], test_jar, cmd) +else: +if param == "": +cmd = "--%s %s" % (param, params_value[index]) +else: +cmd = "%s --%s %s" % (cmd, param, params_value[index]) +scenario_name = "%s_%s" % (scenario_name, param) +scenario_names.append(scenario_name[1:]) +scenarios.append(cmd) +return 0, scenarios, scenario_names + + +def get_avg(values): +if len(values) == 0: +return 0.0 +else: +return sum(values) * 1.0 / len(values) + + +def run_cases(scenario_file_name, flink_home, am_seserver_dddress, inter_nums=10, wait_minute=10): +status, scenarios, scenario_names = get_scenarios(scenario_file_name) +for scenario_index in range(0, len(scenarios)): +scenario = scenarios.get(scenario_index) +scenario_name = scenario_names[scenario_index] +total_qps = [] +status = start_server(flink_home) Review comment: yes, it's ok to reuse cluster, i updated it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
[GitHub] [flink] flinkbot edited a comment on issue #9973: [FLINK-9955] Add Kubernetes ClusterDescriptor to support deploying session cluster.
flinkbot edited a comment on issue #9973: [FLINK-9955] Add Kubernetes ClusterDescriptor to support deploying session cluster. URL: https://github.com/apache/flink/pull/9973#issuecomment-545325448 ## CI report: * 450cd1029219372cdc4f05f4bf6bc3ff6e43dc90 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133141964) * 6285ff3752b3d5aa9e3237601d77fdd44af17874 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133366937) * 614834429a8548e253adae3f73fd6d0cae01a3b5 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/133998662) * baba3f140349fef37cdbd1fd0002619a8063b1db : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134010535) * 1f1ec2e29f04ef06e9cc05bca8a406d9eee5a957 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137145239) * 62d628cda95e2fee7f74435bffe0bd6885490f5c : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017618) * e351cd251218ab155de872cea5df6c9e9e242c8e : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139116860) * bce759ca103b5ca92a259438f40f0877c1c9483a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139256976) * 63bc2b0acc6e3db11243d5c81ffe5565aa26d9ab : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139464342) * 336f18f2bb935306e7d1088691d624158d0e5de0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139677734) * 5bfa111036c999b352b7c003f2ffae036b968f3f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139843038) * e8267bcdcbe0458d8eb3f055f1a261acd9027995 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139844138) * b0f46f54460039468e55c6feee2f0fa9df625cbe : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139847929) * 1bd7fa188ecc6814435d18f409a5e3629c717365 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10480: [FLINK-15095] [table-planner-blink] bridge table schema's primary key to metadata handler in blink planner
flinkbot edited a comment on issue #10480: [FLINK-15095] [table-planner-blink] bridge table schema's primary key to metadata handler in blink planner URL: https://github.com/apache/flink/pull/10480#issuecomment-562858870 ## CI report: * f2cc63f3495dab7e6a1bb09f8663265739e41ffd : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139862381) * ce6f2c3b92121d7c36803595bb2665992d0c3ab2 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139940590) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager
flinkbot edited a comment on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager URL: https://github.com/apache/flink/pull/10228#issuecomment-554599522 ## CI report: * 8c07f8927fcbb96ae2e6f7b155ebc89287e2dbe9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/136797904) * 32f7901560ffd6635e5cdc4463327906574eceda : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/136883965) * 57b530f5e6228e59c3899f475c212c4dcf52d56a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/137310623) * 551b4df0f88a31a0989e3428ac4e89423514d4f9 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/137496433) * 53cbeb683f8fb71b2eed0eb1e0833fdfbe00379e : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/140014987) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10474: [FLINK-15116] Make JobClient stateless, remove AutoCloseable
flinkbot edited a comment on issue #10474: [FLINK-15116] Make JobClient stateless, remove AutoCloseable URL: https://github.com/apache/flink/pull/10474#issuecomment-562698759 ## CI report: * b29a8cc494e7033b9c8fb21ab982060b57aa90f3 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139804062) * 013f138b677b2761458f591c2ea31ee8d166ce0f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139808162) * 11f28d34c651b6629217d379b52de05e68bbf33e : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139823571) * 434805415d38683f31eddb46029b9c0124a8d130 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139827112) * 386a099b31f4d34d403bf1fd2158f2a294f57079 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139832995) * 43376b79e27f937041e74995ce3de5bb4d7808c7 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139850916) * 547c156d4068dcecf85ea91a1d58057824c09763 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139854101) * c0262593c93b0f66be2ddcd415130e45f6408faf : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139880387) * ba1b25eabbc824c8c6186edf179c97846bd8fbee : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139886739) * e28f2d30d556ca8d38165a1eb5a7f22da43bbb2a : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-15020) Support timestamp type in hive
[ https://issues.apache.org/jira/browse/FLINK-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-15020. -- Assignee: Rui Li Resolution: Fixed master: 57bb952e3a1732caa5acb640a1712bcdd55d813d > Support timestamp type in hive > -- > > Key: FLINK-15020 > URL: https://issues.apache.org/jira/browse/FLINK-15020 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Jingsong Lee >Assignee: Rui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Now, FLINK-14599 is finished, blink-planner have the ability to support > timestamp type with precision 9. So we can support timestamp type in hive now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] KurtYoung closed pull request #10401: [FLINK-15020][hive] Support timestamp type in hive
KurtYoung closed pull request #10401: [FLINK-15020][hive] Support timestamp type in hive URL: https://github.com/apache/flink/pull/10401 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung closed pull request #10480: [FLINK-15095] [table-planner-blink] bridge table schema's primary key to metadata handler in blink planner
KurtYoung closed pull request #10480: [FLINK-15095] [table-planner-blink] bridge table schema's primary key to metadata handler in blink planner URL: https://github.com/apache/flink/pull/10480 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-15095) bridge table schema's primary key to metadata handler in blink planner
[ https://issues.apache.org/jira/browse/FLINK-15095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-15095. -- Resolution: Fixed master: d4bd2d6ee64ba83c42981e4e62374fb962bb71bf > bridge table schema's primary key to metadata handler in blink planner > -- > > Key: FLINK-15095 > URL: https://issues.apache.org/jira/browse/FLINK-15095 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager
flinkbot edited a comment on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager URL: https://github.com/apache/flink/pull/10228#issuecomment-554599522 ## CI report: * 8c07f8927fcbb96ae2e6f7b155ebc89287e2dbe9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/136797904) * 32f7901560ffd6635e5cdc4463327906574eceda : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/136883965) * 57b530f5e6228e59c3899f475c212c4dcf52d56a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/137310623) * 551b4df0f88a31a0989e3428ac4e89423514d4f9 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/137496433) * 53cbeb683f8fb71b2eed0eb1e0833fdfbe00379e : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10481: [FLINK-14824][table] Improve schema derivation for formats
flinkbot edited a comment on issue #10481: [FLINK-14824][table] Improve schema derivation for formats URL: https://github.com/apache/flink/pull/10481#issuecomment-562867016 ## CI report: * 70e7764be333154ec2efa0ad0a6e8d8510624628 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139866904) * b8d5ca6c35448661731186e78a4d1dc753e337fe : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139919996) * b7b2a59fb22d9a2d8b6e003a9624a89bbb8d656c : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/140009245) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10481: [FLINK-14824][table] Improve schema derivation for formats
JingsongLi commented on a change in pull request #10481: [FLINK-14824][table] Improve schema derivation for formats URL: https://github.com/apache/flink/pull/10481#discussion_r355163918 ## File path: docs/dev/table/connect.md ## @@ -1321,12 +1322,12 @@ The CSV format can be used as follows: .withFormat( new Csv() -// required: define the schema either by using type information -.schema(Type.ROW(...)) - -// or use the table's schema +// optional: use the table's schema as format schema, this is enabled by default .deriveSchema() Review comment: Maybe we can remove it from doc now? Including `.deriveSchema()` and `.schema(Type.ROW(...))`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW merged pull request #9986: [FLINK-10933][kubernetes] Implement KubernetesSessionClusterEntrypoint.
zhijiangW merged pull request #9986: [FLINK-10933][kubernetes] Implement KubernetesSessionClusterEntrypoint. URL: https://github.com/apache/flink/pull/9986 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10481: [FLINK-14824][table] Improve schema derivation for formats
flinkbot edited a comment on issue #10481: [FLINK-14824][table] Improve schema derivation for formats URL: https://github.com/apache/flink/pull/10481#issuecomment-562867016 ## CI report: * 70e7764be333154ec2efa0ad0a6e8d8510624628 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139866904) * b8d5ca6c35448661731186e78a4d1dc753e337fe : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139919996) * b7b2a59fb22d9a2d8b6e003a9624a89bbb8d656c : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate
flinkbot edited a comment on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate URL: https://github.com/apache/flink/pull/10371#issuecomment-560045651 ## CI report: * 24326ffa979a4f9ed56265c0db11a874d3c4d2a2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138813356) * f7d69ecf50a0af52d04af9aafcfd18d03545e6e9 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138816941) * 43a4b42d5163054336fd56cc2e6e5e5e9b387bfb : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/140002197) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager
lamber-ken commented on issue #10228: [FLINK-14816] Add thread dump feature for taskmanager URL: https://github.com/apache/flink/pull/10228#issuecomment-562916680 > Now that the old Web UI is dropped, can we rebase this on the latest master? Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9986: [FLINK-10933][kubernetes] Implement KubernetesSessionClusterEntrypoint.
flinkbot edited a comment on issue #9986: [FLINK-10933][kubernetes] Implement KubernetesSessionClusterEntrypoint. URL: https://github.com/apache/flink/pull/9986#issuecomment-545891630 ## CI report: * 593bf42620faf09c1accbd692494646194e3d574 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133371151) * bb9fbd1d51a478793f63ae8b6d6e92b6a5a53775 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133940616) * bc25d444faeeaa773f040b14159aafe5a6a5a975 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133953929) * d8819bf3615c497b501399bc476de889c17dc239 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133998596) * d1430642ae91d8ed58479fb9d1492c433312a9b2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/134010391) * 17ffc7f7d12f2115eb6b4c86af2c627ce1ad68aa : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/137136929) * 0b0fb6b154fc91f944fa8972266b72578cd4e765 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138017441) * 8a58370a6aa7b67ee294af764c25ced0e24ba364 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138990632) * 6a72f32c09cf42b6ed473f17b31f5fbc18b720f8 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139116907) * 69fb531209587ab73fae38c574017c4ea19ac759 : UNKNOWN * 5c31b05a3aa53614934466638d25aea1e4d0fc9a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139888646) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
flinkbot edited a comment on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#issuecomment-562914317 ## CI report: * c604c9f8704578c33884b2faf8d163a7482795c4 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139991088) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#discussion_r355162107 ## File path: flink-python/pyflink/table/catalog.py ## @@ -804,6 +804,13 @@ def is_temporary(self): """ return self._j_catalog_function.isTemporary() +def is_system(self): Review comment: no need for this in catalog function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#discussion_r355162113 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterFunction.java ## @@ -118,6 +118,10 @@ public boolean isTemporary() { return isTemporary; } + public boolean isSystemFunction() { Review comment: no need for this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#discussion_r355162115 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogFunctionImpl.java ## @@ -33,16 +33,22 @@ private final String className; // Fully qualified class name of the function private final FunctionLanguage functionLanguage; private final boolean isTemporary; + private final boolean isSystem; Review comment: no need for this in catalog function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355162114 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (null == hostsStr || hostsStr.length() == 0) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr); + } + + final String[] hosts = hostsStr.split(";"); Review comment: `descriptorProperties.getString` can ensure hostsStr not empty, `hosts.size > 0` is always true here, Should we validate it again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#discussion_r355162109 ## File path: flink-python/pyflink/table/tests/test_catalog.py ## @@ -69,6 +69,7 @@ def check_catalog_function_equals(self, f1, f2): self.assertEqual(f1.get_class_name(), f2.get_class_name()) self.assertEqual(f1.is_generic(), f2.is_generic()) self.assertEqual(f1.is_temporary(), f2.is_temporary()) +self.assertEqual(f1.is_system(), f2.is_system()) Review comment: no need for this in catalog function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #10381: [FLINK-14513][hive] Implement listPartitionsByFilter to HiveCatalog
xuefuz commented on a change in pull request #10381: [FLINK-14513][hive] Implement listPartitionsByFilter to HiveCatalog URL: https://github.com/apache/flink/pull/10381#discussion_r355161669 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java ## @@ -175,4 +186,93 @@ public static boolean requireRelyConstraint(byte trait) { return (trait & HIVE_CONSTRAINT_RELY) != 0; } + /** +* Generates a filter string for partition columns from the given filter expressions. +* +* @param numNonPartCol The number of non-partition columns -- used to shift field reference index +* @param partColNames The names of all partition columns +* @param expressions The filter expressions in CNF form +* @return an Optional filter string equivalent to the expressions, which is empty if the expressions can't be handled +*/ + public static Optional makePartitionFilter(int numNonPartCol, List partColNames, List expressions) { Review comment: Nit: what about rename numNonPartCol to partColOffset? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355162022 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java ## @@ -177,13 +157,112 @@ public void testTableSource() { assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass())); } + @Test + @SuppressWarnings("unchecked") + public void testTableSourceWithLegacyProperties() { + // prepare parameters for Kafka table source + final TableSchema schema = TableSchema.builder() + .field(FRUIT_NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) + .field(PROC_TIME, DataTypes.TIMESTAMP(3)) + .build(); + + final List rowtimeAttributeDescriptors = Collections.singletonList( + new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps())); + + final Map fieldMapping = new HashMap<>(); + fieldMapping.put(FRUIT_NAME, NAME); + fieldMapping.put(NAME, NAME); + fieldMapping.put(COUNT, COUNT); + fieldMapping.put(TIME, TIME); + + final Map specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + + final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( + TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .build() + .toRowType() + ); + + final KafkaTableSourceBase expected = getExpectedKafkaTableSource( + schema, + Optional.of(PROC_TIME), + rowtimeAttributeDescriptors, + fieldMapping, + TOPIC, + KAFKA_PROPERTIES, + deserializationSchema, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets); + + TableSourceValidation.validateTableSource(expected); + + // construct table source using descriptors and table source factory + final Map legacyPropertiesMap = new HashMap<>(); + legacyPropertiesMap.putAll(createKafkaSourceProperties()); + + // use legacy properties + legacyPropertiesMap.remove("connector.specific-offsets"); + legacyPropertiesMap.remove("connector.properties.zookeeper.connect"); + legacyPropertiesMap.remove("connector.properties.bootstrap.servers"); + legacyPropertiesMap.remove("connector.properties.group.id"); + + legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0"); + legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100"); + legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1"); + legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123"); + legacyPropertiesMap.put("connector.properties.0.key", "zookeeper.connect"); + legacyPropertiesMap.put("connector.properties.0.value", "dummy"); + legacyPropertiesMap.put("connector.properties.1.key", "bootstrap.servers"); + legacyPropertiesMap.put("connector.properties.1.value", "dummy"); + legacyPropertiesMap.put("connector.properties.2.key", "group.id"); + legacyPropertiesMap.put("connector.properties.2.value", "dummy"); + + final TableSource actualSource = TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap) + .createStreamTableSource(legacyPropertiesMap); + + assertEquals(expected, actualSource); + + // test Kafka consumer + final KafkaTableSourceBase actualKafkaSource = (KafkaTableSourceBase) actualSource; + final StreamExecutionEnvironmentMock mock
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161955 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return SpecificOffsets with map format, key is partition, and value is offset. +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { + final Map offsetMap = new HashMap<>(); + + final String parseSpecificOffsetsStr = descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS); + if (parseSpecificOffsetsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_SPECIFIC_OFFSETS + "' can not be empty."); + } Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-15127) rename CreateFunctionOperation, DropFunctionOperation, AlterFunctionOperation to CreateCatalogFunctionOperation, DropCatalogFunctionOperation, AlterCatalogFunctionOperat
Bowen Li created FLINK-15127: Summary: rename CreateFunctionOperation, DropFunctionOperation, AlterFunctionOperation to CreateCatalogFunctionOperation, DropCatalogFunctionOperation, AlterCatalogFunctionOperation Key: FLINK-15127 URL: https://issues.apache.org/jira/browse/FLINK-15127 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Bowen Li Assignee: Zhenqiu Huang rename these operations since they should only support operations related to catalog functions (both temp and persistent) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate
flinkbot edited a comment on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate URL: https://github.com/apache/flink/pull/10371#issuecomment-560045651 ## CI report: * 24326ffa979a4f9ed56265c0db11a874d3c4d2a2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138813356) * f7d69ecf50a0af52d04af9aafcfd18d03545e6e9 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/138816941) * 43a4b42d5163054336fd56cc2e6e5e5e9b387bfb : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161941 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ## @@ -265,24 +259,22 @@ public Kafka sinkPartitionerCustom(Class partit } if (specificOffsets != null) { - final List> values = new ArrayList<>(); + final StringBuilder stringBuilder = new StringBuilder(); + int i = 0; for (Map.Entry specificOffset : specificOffsets.entrySet()) { - values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); + if (i != 0) { + stringBuilder.append(';'); + } + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(','); + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue()); + i++; } - properties.putIndexedFixedProperties( - CONNECTOR_SPECIFIC_OFFSETS, - Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET), - values); + properties.putString(CONNECTOR_SPECIFIC_OFFSETS, stringBuilder.toString()); } if (kafkaProperties != null) { - properties.putIndexedFixedProperties( - CONNECTOR_PROPERTIES, - Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE), - this.kafkaProperties.entrySet().stream() - .map(e -> Arrays.asList(e.getKey(), e.getValue())) - .collect(Collectors.toList()) - ); + this.kafkaProperties.entrySet().forEach(entry -> + properties.putString(CONNECTOR_PROPERTIES + '.' + entry.getKey(), entry.getValue())); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161944 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161898 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (hostsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty."); + } Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10364: [FLINK-14993][metrics] FrontMetricGroup passes reporter-specific parameters
flinkbot edited a comment on issue #10364: [FLINK-14993][metrics] FrontMetricGroup passes reporter-specific parameters URL: https://github.com/apache/flink/pull/10364#issuecomment-559827376 ## CI report: * bf9a31483c0d55c968f65b8ca4a11557f52de456 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/138727067) * 8492919a37ecd85428c327d9893fb0ca1fa07734 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/139364349) * 72284c361a6ff957c3f8d831d49dc4c93ac8f282 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139878424) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161876 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: ok ,I thnink validateAndParseHostsString will be better This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161876 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: ok ,I think validateAndParseHostsString will be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161864 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: ok, I think validateAndGetHosts will be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161869 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return SpecificOffsets with map format, key is partition, and value is offset. +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161864 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: ok, I think validateAndGetHosts will be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161822 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-15126) migrate "show functions" from sql cli to sql parser
Bowen Li created FLINK-15126: Summary: migrate "show functions" from sql cli to sql parser Key: FLINK-15126 URL: https://issues.apache.org/jira/browse/FLINK-15126 Project: Flink Issue Type: Sub-task Components: Table SQL / Client, Table SQL / Planner Reporter: Bowen Li Assignee: Zhenqiu Huang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lamber-ken edited a comment on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0
lamber-ken edited a comment on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0 URL: https://github.com/apache/flink/pull/9751#issuecomment-562915008 > I wouldn't want to increment such a crucial dependency so close to a release. Okay, we may already spend many time on this, because this issue was opened at on 24 Sep. I need to check whether there are conflicting files often. Btw, https://github.com/apache/flink/pull/9158 was blocked for this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161802 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java ## @@ -308,13 +304,7 @@ public Elasticsearch connectionPathPrefix(String pathPrefix) { final DescriptorProperties properties = new DescriptorProperties(); properties.putProperties(internalProperties); - final List> hostValues = hosts.stream() - .map(host -> Arrays.asList(host.hostname, String.valueOf(host.port), host.protocol)) - .collect(Collectors.toList()); - properties.putIndexedFixedProperties( - CONNECTOR_HOSTS, - Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL), - hostValues); + properties.putString(CONNECTOR_HOSTS, hosts.stream().map(Host::toString).collect(Collectors.joining(";"))); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0
lamber-ken commented on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0 URL: https://github.com/apache/flink/pull/9751#issuecomment-562915008 > I wouldn't want to increment such a crucial dependency so close to a release. Okay, we may already spend many time on this, because this issue was opened at on 24 Sep. I need to check whether there are conflicting files often. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161756 ## File path: docs/dev/table/connect.md ## @@ -247,10 +247,8 @@ tables: topic: test-input startup-mode: earliest-offset properties: -- key: zookeeper.connect - value: localhost:2181 -- key: bootstrap.servers - value: localhost:9092 +zookeeper.connect: localhost:2181,localhost:2182 Review comment: but in FLIP 86, we defined an example as following: `'connector.properties.zookeeper.connect'='localhost:2181,localhost:2182', 'connector.properties.bootstrap.servers'='localhost:9092,localhost:9093', 'connector.properties.group.id'='testGroup' ` so I think keep this or use 'hostname1:2181,host_name2:2181' can make sense both. @JingsongLi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
bowenli86 commented on a change in pull request #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#discussion_r355161575 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java ## @@ -204,6 +204,15 @@ public boolean hasTemporaryCatalogFunction(ObjectIdentifier functionIdentifier) return tempCatalogFunctions.containsKey(normalizedIdentifier); } + /** +* Check whether a temporary system function is already registered. +* @param functionName the name of the function +* @return whether the temporary system function exists in the function catalog +*/ + public boolean hasSystemCatalogFunction(String functionName) { Review comment: should be `hasTempSystemFunction` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161716 ## File path: docs/dev/table/connect.md ## @@ -247,10 +247,8 @@ tables: topic: test-input startup-mode: earliest-offset properties: -- key: zookeeper.connect - value: localhost:2181 -- key: bootstrap.servers - value: localhost:9092 +zookeeper.connect: localhost:2181,localhost:2182 Review comment: I'm fine with keeping previous example, it is a kafka properties, the Kafka users should know or refer Kafka docs for multiple hosts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161684 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) { hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> properties.validateString(key, false, 1)); hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> properties.validateInt(key, false, 0, 65535)); hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> properties.validateString(key, false, 1)); - properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators); + + if (properties.containsKey(CONNECTOR_HOSTS)) { + validateAndGetHostsStr(properties); Review comment: we'd better parse and validate at the same time because `DescriptorProperties` do not support `Consumer` that can validate `String` value like `partition:0,offset:42;partition:1,offset:300` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161661 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ## @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit } if (specificOffsets != null) { - final List> values = new ArrayList<>(); + final StringBuilder stringBuilder = new StringBuilder(); + int i = 0; for (Map.Entry specificOffset : specificOffsets.entrySet()) { - values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); + if (i != 0) { + stringBuilder.append(';'); + } + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(','); + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue()); Review comment: Please break method chain if it is too long. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161115 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties Review comment: Remove `@param` if no documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355158905 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) { hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> properties.validateString(key, false, 1)); hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> properties.validateInt(key, false, 0, 65535)); hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> properties.validateString(key, false, 1)); - properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators); + + if (properties.containsKey(CONNECTOR_HOSTS)) { + validateAndGetHostsStr(properties); + } else { + properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators); Review comment: You can put the initialization of `hostsValidators` in this `else` clause. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355159036 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: 1. You can `import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;` to remove the the `ElasticsearchUpsertTableSinkBase.` prefix. 2. The method name should be `validateAndParseHostsString` or `validateAndGetHosts`? Because the return value is not a string. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355158623 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java ## @@ -308,13 +304,7 @@ public Elasticsearch connectionPathPrefix(String pathPrefix) { final DescriptorProperties properties = new DescriptorProperties(); properties.putProperties(internalProperties); - final List> hostValues = hosts.stream() - .map(host -> Arrays.asList(host.hostname, String.valueOf(host.port), host.protocol)) - .collect(Collectors.toList()); - properties.putIndexedFixedProperties( - CONNECTOR_HOSTS, - Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL), - hostValues); + properties.putString(CONNECTOR_HOSTS, hosts.stream().map(Host::toString).collect(Collectors.joining(";"))); Review comment: The method chain is too long. Please break it. ```java properties.putString( CONNECTOR_HOSTS, hosts.stream() .map(Host::toString) .collect(Collectors.joining(";"))); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjffdu commented on issue #10474: [FLINK-15116] Make JobClient stateless, remove AutoCloseable
zjffdu commented on issue #10474: [FLINK-15116] Make JobClient stateless, remove AutoCloseable URL: https://github.com/apache/flink/pull/10474#issuecomment-562914776 @aljoscha This PR works for Zeppelin, considering the feature freeze, let's move the discussion of general FlinkClient in future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161561 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java ## @@ -177,13 +157,112 @@ public void testTableSource() { assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass())); } + @Test + @SuppressWarnings("unchecked") + public void testTableSourceWithLegacyProperties() { + // prepare parameters for Kafka table source + final TableSchema schema = TableSchema.builder() + .field(FRUIT_NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) + .field(PROC_TIME, DataTypes.TIMESTAMP(3)) + .build(); + + final List rowtimeAttributeDescriptors = Collections.singletonList( + new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps())); + + final Map fieldMapping = new HashMap<>(); + fieldMapping.put(FRUIT_NAME, NAME); + fieldMapping.put(NAME, NAME); + fieldMapping.put(COUNT, COUNT); + fieldMapping.put(TIME, TIME); + + final Map specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + + final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( + TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .build() + .toRowType() + ); + + final KafkaTableSourceBase expected = getExpectedKafkaTableSource( + schema, + Optional.of(PROC_TIME), + rowtimeAttributeDescriptors, + fieldMapping, + TOPIC, + KAFKA_PROPERTIES, + deserializationSchema, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets); + + TableSourceValidation.validateTableSource(expected); + + // construct table source using descriptors and table source factory + final Map legacyPropertiesMap = new HashMap<>(); + legacyPropertiesMap.putAll(createKafkaSourceProperties()); + + // use legacy properties + legacyPropertiesMap.remove("connector.specific-offsets"); + legacyPropertiesMap.remove("connector.properties.zookeeper.connect"); + legacyPropertiesMap.remove("connector.properties.bootstrap.servers"); + legacyPropertiesMap.remove("connector.properties.group.id"); + + legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0"); + legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100"); + legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1"); + legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123"); + legacyPropertiesMap.put("connector.properties.0.key", "zookeeper.connect"); + legacyPropertiesMap.put("connector.properties.0.value", "dummy"); + legacyPropertiesMap.put("connector.properties.1.key", "bootstrap.servers"); + legacyPropertiesMap.put("connector.properties.1.value", "dummy"); + legacyPropertiesMap.put("connector.properties.2.key", "group.id"); + legacyPropertiesMap.put("connector.properties.2.value", "dummy"); + + final TableSource actualSource = TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap) + .createStreamTableSource(legacyPropertiesMap); + + assertEquals(expected, actualSource); + + // test Kafka consumer + final KafkaTableSourceBase actualKafkaSource = (KafkaTableSourceBase) actualSource; + final StreamExecutionEnvironmentMock mock = n
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355159149 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (hostsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty."); + } + + final String[] hosts = hostsStr.split(";"); + for (String host : hosts) { + try { + final URL url = new URL(host); + final String protocol = url.getProtocol(); + final String hostNmae = url.getHost(); + final int hostPort = url.getPort(); + hostList.add(new ElasticsearchUpsertTableSinkBase.Host(hostNmae, hostPort, protocol)); + } catch (MalformedURLException e) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' format should follow the " + + "format 'http://host_name:9092', but is:" + host , e); Review comment: ```suggestion "format 'http://host_name:port', but is '" + host + "'.", e); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355159119 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (hostsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty."); + } Review comment: Can be optimized into `descriptorProperties.validateString(CONNECTOR_HOSTS, false, 1);` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355159387 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (hostsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty."); + } + + final String[] hosts = hostsStr.split(";"); + for (String host : hosts) { + try { + final URL url = new URL(host); + final String protocol = url.getProtocol(); + final String hostNmae = url.getHost(); + final int hostPort = url.getPort(); Review comment: Please check `protocol` and `hostName` are not null, and `hostPort` is not `-1`. Because if the url string is invalid, the parsing still works, e.g. `http:abc` doesn't throw exception. Please also add some unit tests to check this util method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161152 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return SpecificOffsets with map format, key is partition, and value is offset. +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { + final Map offsetMap = new HashMap<>(); + + final String parseSpecificOffsetsStr = descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS); + if (parseSpecificOffsetsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_SPECIFIC_OFFSETS + "' can not be empty."); + } Review comment: Can use `descriptorProperties.validateString(CONNECTOR_SPECIFIC_OFFSETS, false, 1);` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355160813 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return SpecificOffsets with map format, key is partition, and value is offset. +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { Review comment: `validateAndParseSpecificOffsetsString`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355158956 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return Review comment: Remove the un-documented `@param`, `@return`, or document them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355159144 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (null == hostsStr || hostsStr.length() == 0) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr); + } + + final String[] hosts = hostsStr.split(";"); Review comment: The validation of hosts.size > 0 is still missing? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355160903 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ## @@ -265,24 +259,22 @@ public Kafka sinkPartitionerCustom(Class partit } if (specificOffsets != null) { - final List> values = new ArrayList<>(); + final StringBuilder stringBuilder = new StringBuilder(); + int i = 0; for (Map.Entry specificOffset : specificOffsets.entrySet()) { - values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); + if (i != 0) { + stringBuilder.append(';'); + } + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(','); + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue()); + i++; } - properties.putIndexedFixedProperties( - CONNECTOR_SPECIFIC_OFFSETS, - Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET), - values); + properties.putString(CONNECTOR_SPECIFIC_OFFSETS, stringBuilder.toString()); } if (kafkaProperties != null) { - properties.putIndexedFixedProperties( - CONNECTOR_PROPERTIES, - Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE), - this.kafkaProperties.entrySet().stream() - .map(e -> Arrays.asList(e.getKey(), e.getValue())) - .collect(Collectors.toList()) - ); + this.kafkaProperties.entrySet().forEach(entry -> + properties.putString(CONNECTOR_PROPERTIES + '.' + entry.getKey(), entry.getValue())); Review comment: simplify to ```java this.kafkaProperties.forEach((key, value) -> properties.putString(CONNECTOR_PROPERTIES + '.' + key, value)); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
wuchong commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355158593 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java ## @@ -200,15 +201,21 @@ private DescriptorProperties getValidatedProperties(Map properti } private List getHosts(DescriptorProperties descriptorProperties) { - final List> hosts = descriptorProperties.getFixedIndexedProperties( - CONNECTOR_HOSTS, - Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL)); - return hosts.stream() - .map(host -> new Host( - descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)), - descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)), - descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL - .collect(Collectors.toList()); + final List hostList = new ArrayList<>(); + if (descriptorProperties.containsKey(CONNECTOR_HOSTS)) { + return validateAndGetHostsStr(descriptorProperties); + } else { + final List> hosts = descriptorProperties.getFixedIndexedProperties( + CONNECTOR_HOSTS, + Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL)); + hosts.stream() + .forEach(host -> hostList.add(new Host( + descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)), + descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)), + descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))) + )); + } + return hostList; Review comment: If you return in a `if` clause, then it would better to return in `else` clause too. suggestion: ```java if (descriptorProperties.containsKey(CONNECTOR_HOSTS)) { return validateAndGetHostsStr(descriptorProperties); } else { final List> hosts = descriptorProperties.getFixedIndexedProperties( CONNECTOR_HOSTS, Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL)); return hosts.stream() .map(host -> new Host( descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)), descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)), descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL .collect(Collectors.toList()); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on issue #9689: [FLINK-7151] add a basic function ddl
HuangZhenQiu commented on issue #9689: [FLINK-7151] add a basic function ddl URL: https://github.com/apache/flink/pull/9689#issuecomment-562914670 This PR is split into multiple smaller PRs. It is no longer needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu closed pull request #9689: [FLINK-7151] add a basic function ddl
HuangZhenQiu closed pull request #9689: [FLINK-7151] add a basic function ddl URL: https://github.com/apache/flink/pull/9689 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate
HuangZhenQiu commented on issue #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate URL: https://github.com/apache/flink/pull/10371#issuecomment-562914586 @JingsongLi Thanks for the feedback. I changed accordingly. Please take one more round of look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate
HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate URL: https://github.com/apache/flink/pull/10371#discussion_r355161398 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java ## @@ -200,6 +204,8 @@ private ParquetTableSource(String path, MessageType parquetSchema, Configuration for (FilterPredicate converted : convertedPredicates.subList(1, convertedPredicates.size())) { parquetPredicate = FilterApi.and(parquetPredicate, converted); } + parquetPredicate = LogicalInverseRewriter.rewrite(parquetPredicate); + SchemaCompatibilityValidator.validate(parquetPredicate, parquetSchema); Review comment: It was added when i do local test. it is not needed. Reverted back. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t…
flinkbot commented on issue #10484: [FLINK-14906][table] create and drop temp system functions from DDL t… URL: https://github.com/apache/flink/pull/10484#issuecomment-562914317 ## CI report: * c604c9f8704578c33884b2faf8d163a7482795c4 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events
flinkbot edited a comment on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events URL: https://github.com/apache/flink/pull/10483#issuecomment-562912876 ## CI report: * f7d3793fc60c8700f99adf52ed760e9665f42a90 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/139976585) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate
HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate URL: https://github.com/apache/flink/pull/10371#discussion_r355160821 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java ## @@ -447,8 +453,16 @@ private String getColumnName(BinaryComparison comp) { @Nullable private Tuple2 extractColumnAndLiteral(BinaryComparison comp) { - TypeInformation typeInfo = getLiteralType(comp); String columnName = getColumnName(comp); + TypeInformation typeInfo = null; + try { + String[] columnPath = columnName.split("\\."); + Type type = parquetSchema.getType(columnPath); + typeInfo = ParquetSchemaConverter.convertParquetTypeToTypeInfo(type); + } catch (InvalidRecordException e) { Review comment: It is not needed if users use sql. But as the applyPredicate function is public, it is possible for user to specify an undefined field in expression. In ParquetTableSourceTest, also cover the scenarios. I catch the exception here to make sure original test case can pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10460: [FLINK-14574] Fixing writing with plugin s3 filesystem.
flinkbot edited a comment on issue #10460: [FLINK-14574] Fixing writing with plugin s3 filesystem. URL: https://github.com/apache/flink/pull/10460#issuecomment-562508393 ## CI report: * 61e387e9665125032c92ed23afe138c4973b7eeb : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139672737) * 1696c5aaacdc526c8b615bd1e68ae42af23599a8 : UNKNOWN * 10ffc91acaba9964019f0301c0b0a1ecd7797732 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139677707) * 09f02dc709e92105087ee74572618734cedf2f92 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139719249) * e83f45cda365e2b668c99fadd7a055ca8048b9e9 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139859320) * 8fcd386c8ae470dfcc9a3e4a0c2ff971e88156e2 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/139860794) * 2da3b996050b105b54f53f4f6453431c779da9bc : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139871817) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161099 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ## @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit } if (specificOffsets != null) { - final List> values = new ArrayList<>(); + final StringBuilder stringBuilder = new StringBuilder(); + int i = 0; for (Map.Entry specificOffset : specificOffsets.entrySet()) { - values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); + if (i != 0) { + stringBuilder.append(';'); + } + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(','); + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue()); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161074 ## File path: docs/dev/table/connect.md ## @@ -247,10 +247,8 @@ tables: topic: test-input startup-mode: earliest-offset properties: -- key: zookeeper.connect - value: localhost:2181 -- key: bootstrap.servers - value: localhost:9092 +zookeeper.connect: localhost:2181,localhost:2182 Review comment: Just to specify multi hosts format in a string here, maybe 'hostname1:2181,host_name2:2181' will better, how do you think of ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate
HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate URL: https://github.com/apache/flink/pull/10371#discussion_r355160831 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java ## @@ -465,6 +479,10 @@ private String getColumnName(BinaryComparison comp) { typeInfo == BasicTypeInfo.INT_TYPE_INFO) { return new Tuple2<>(FilterApi.intColumn(columnName), (Integer) value); } else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { + if (value instanceof Integer) { + Integer intValue = (Integer) value; + return new Tuple2<>(FilterApi.longColumn(columnName), intValue.longValue()); + } return new Tuple2<>(FilterApi.longColumn(columnName), (Long) value); Review comment: Agree. It is more readable. Changed accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate
HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate URL: https://github.com/apache/flink/pull/10371#discussion_r355160821 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java ## @@ -447,8 +453,16 @@ private String getColumnName(BinaryComparison comp) { @Nullable private Tuple2 extractColumnAndLiteral(BinaryComparison comp) { - TypeInformation typeInfo = getLiteralType(comp); String columnName = getColumnName(comp); + TypeInformation typeInfo = null; + try { + String[] columnPath = columnName.split("\\."); + Type type = parquetSchema.getType(columnPath); + typeInfo = ParquetSchemaConverter.convertParquetTypeToTypeInfo(type); + } catch (InvalidRecordException e) { Review comment: It is not needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate
HuangZhenQiu commented on a change in pull request #10371: [FLINK-14953][formats] use table type to build parquet FilterPredicate URL: https://github.com/apache/flink/pull/10371#discussion_r355160800 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java ## @@ -447,8 +453,16 @@ private String getColumnName(BinaryComparison comp) { @Nullable private Tuple2 extractColumnAndLiteral(BinaryComparison comp) { - TypeInformation typeInfo = getLiteralType(comp); String columnName = getColumnName(comp); + TypeInformation typeInfo = null; + try { + String[] columnPath = columnName.split("\\."); Review comment: Thanks for the suggestion. Changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong closed pull request #10213: [FLINK-12846][table-common] Carry primary key information in TableSchema
wuchong closed pull request #10213: [FLINK-12846][table-common] Carry primary key information in TableSchema URL: https://github.com/apache/flink/pull/10213 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #10245: [FLINK-10936][kubernetes] Implement Kubernetes command line tools to support session cluster.
zhuzhurk commented on a change in pull request #10245: [FLINK-10936][kubernetes] Implement Kubernetes command line tools to support session cluster. URL: https://github.com/apache/flink/pull/10245#discussion_r355160547 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java ## @@ -42,6 +47,8 @@ */ public abstract class AbstractCustomCommandLine implements CustomCommandLine { + private static final Logger LOG = LoggerFactory.getLogger(AbstractCustomCommandLine.class); Review comment: Maybe we can make this logger `protected final Logger LOG = LoggerFactory.getLogger(getClass());` for all its sub classes to use, so that we do not need to create multiple loggers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #10245: [FLINK-10936][kubernetes] Implement Kubernetes command line tools to support session cluster.
zhuzhurk commented on a change in pull request #10245: [FLINK-10936][kubernetes] Implement Kubernetes command line tools to support session cluster. URL: https://github.com/apache/flink/pull/10245#discussion_r355160571 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java ## @@ -89,4 +96,80 @@ public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandL return resultingConfiguration; } + + protected void printUsage() { + System.out.println("Usage:"); + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(200); + formatter.setLeftPadding(5); + + formatter.setSyntaxPrefix(" Optional"); + Options options = new Options(); + addGeneralOptions(options); + addRunOptions(options); + formatter.printHelp(" ", options); + } + + protected static int handleCliArgsException(CliArgsException e) { + LOG.error("Could not parse the command line arguments.", e); + + System.out.println(e.getMessage()); + System.out.println(); + System.out.println("Use the help option (-h or --help) to get help on the command."); + return 1; + } + + protected static int handleError(Throwable t) { + LOG.error("Error while running the Flink session.", t); + + System.err.println(); + System.err.println(""); + System.err.println(" The program finished with the following exception:"); + System.err.println(); + + t.printStackTrace(); + return 1; + } + + /** +* Read-Evaluate-Print step for the REPL. +* +* @param in to read from +* @param readConsoleInput true if console input has to be read +* @param clientPollingIntervalMs wait until clientPollingIntervalMs is over or the user entered something. +* @param helpMessage help message +* @return true if the REPL shall be continued, otherwise false +*/ + protected static boolean repStep( + BufferedReader in, + boolean readConsoleInput, + long clientPollingIntervalMs, + String helpMessage) throws IOException, InterruptedException { + + long startTime = System.currentTimeMillis(); + while ((System.currentTimeMillis() - startTime) < clientPollingIntervalMs Review comment: why not also making `CLIENT_POLLING_INTERVAL_MS` common for all clients? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-15096) Do not use GlobalJobParameters to pass system configuration
[ https://issues.apache.org/jira/browse/FLINK-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-15096. --- Resolution: Fixed > Do not use GlobalJobParameters to pass system configuration > --- > > Key: FLINK-15096 > URL: https://issues.apache.org/jira/browse/FLINK-15096 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.10.0 >Reporter: Dawid Wysakowicz >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GlobalJobParameters is a user only configuration that should not be used to > ship system specific settings. > Right now python uses it to ship information about custom archives, files, > executables etc. > A solution would be to pass required configuration when instantiating the > operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15096) Do not use GlobalJobParameters to pass system configuration
[ https://issues.apache.org/jira/browse/FLINK-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990721#comment-16990721 ] Hequn Cheng commented on FLINK-15096: - Resolved in 1.10.0 via 0d7c15703d0dd304d49203c163e9a1397e4e0d9e > Do not use GlobalJobParameters to pass system configuration > --- > > Key: FLINK-15096 > URL: https://issues.apache.org/jira/browse/FLINK-15096 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.10.0 >Reporter: Dawid Wysakowicz >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GlobalJobParameters is a user only configuration that should not be used to > ship system specific settings. > Right now python uses it to ship information about custom archives, files, > executables etc. > A solution would be to pass required configuration when instantiating the > operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] hequn8128 closed pull request #10477: [FLINK-15096][python] Pass python configuration to the python operators via table config instead of global job parameters.
hequn8128 closed pull request #10477: [FLINK-15096][python] Pass python configuration to the python operators via table config instead of global job parameters. URL: https://github.com/apache/flink/pull/10477 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events
flinkbot commented on issue #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events URL: https://github.com/apache/flink/pull/10483#issuecomment-562912876 ## CI report: * f7d3793fc60c8700f99adf52ed760e9665f42a90 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10462: [FLINK-15031][runtime] Calculate required shuffle memory before allocating slots if resources are specified
flinkbot edited a comment on issue #10462: [FLINK-15031][runtime] Calculate required shuffle memory before allocating slots if resources are specified URL: https://github.com/apache/flink/pull/10462#issuecomment-562542464 ## CI report: * 708583f6acbff12da90bc28269d924433c446e32 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139686969) * 2f4aee047ff40997c3b940fd1b9e7c7145ed9903 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/139869261) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355157648 ## File path: tools/jenkins/run_case.py ## @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file will be runned by jenkis to run the e2e perf test +# Params: +# am_seserver_dddress: master machines's ip of standalone environment +# scenario_file: the file which contains test scenarios +# flink_home: the path of flink +# inter_nums:the num of every scenario's running, default value is 10 +# wait_minute: interval time of two elections of qps,default value is 10s +# + + +import sys +import time + +if sys.version_info < (3, 5): +print("Python versions prior to 3.5 are not supported.") +sys.exit(-1) + +from logger import logger +from utils import run_command +from restapi_common import get_avg_qps_by_restful_interface + + +def start_server(flink_home): +cmd = "bash %s/bin/start-cluster.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def end_server(flink_home): +cmd = "bash %s/bin/stop_yarn.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def get_scenarios(scenario_file_name, test_jar): +""" +parser file which contains serval scenarios,it's content likes this: +classPath scenarioName jobparam1 jobparams2 +org.apache.Test testScenario1 aaa bbb +…… +:param scenario_file_name: scenario's file +:param test_jar: +:return: list of scenarios +""" +params_name = [] +scenarios = [] +scenario_names = [] +linenum = 0 +with open(scenario_file_name) as file: +data = file.readline() +if not (data.startswith("#") or data == ""): +linenum = linenum + 1 +cmd = "" +scenario_name = "" +if linenum == 1: +params_name = data.split(" ") +for index in range(0, len(params_name)): +params_name[index] = params_name[index] +if not "testClassPath" in params_name: +return 1, [] +else: +params_value = data.split(" ") +for index in range(0, len(params_name)): +param = params_name[index] +if param == "testClassPath": +cmd = "-c %s %s %s" % (params_value[index], test_jar, cmd) +else: +if param == "": +cmd = "--%s %s" % (param, params_value[index]) +else: +cmd = "%s --%s %s" % (cmd, param, params_value[index]) +scenario_name = "%s_%s" % (scenario_name, param) +scenario_names.append(scenario_name[1:]) +scenarios.append(cmd) +return 0, scenarios, scenario_names + + +def get_avg(values): +if len(values) == 0: +return 0.0 +else: +return sum(values) * 1.0 / len(values) + + +def run_cases(scenario_file_name, flink_home, am_seserver_dddress, inter_nums=10, wait_minute=10): +status, scenarios, scenario_names = get_scenarios(scenario_file_name) +for scenario_index in range(0, len(scenarios)): +scenario = scenarios.get(scenario_index) +scenario_name = scenario_names[scenario_index] +total_qps = [] +status = start_server(flink_home) +if not status: +logger.info("start server failed") +return 1 +for inter_index in range(0, inter_nums): +cmd = "bash %s/bin/flink run %s" % (flink_home, scenario) +status, output = run_command(cmd) +if status: +
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355156657 ## File path: tools/jenkins/restapi_common.py ## @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This common file writes the functions of getting qps from flink restful api +# +# Note by AiHua Li: Review comment: Can we remove this line? It seems not a common way in flink to leave an author name in the file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355155525 ## File path: tools/jenkins/init_env.py ## @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Because of end-to-end-performance-test running in the cluster, so This init_env.py only contains flink on +# standalone env, doesn't contains the init environment of maven, java and jenkins. Defaultly maven, java and jenkins +# environment are ready in the cluster +# + +import os +from logger import logger +from utils import run_command +from restapi_common import execute_get + + +def init_standalone_env(host_list, user, source_path, dest_path): +for host in host_list: +cmd = "sudo su %s -c 'ssh %s \" rm -rf %s\"; scp -r %s %s@%s:%s'" % (user, host, dest_path, source_path, + user, host, dest_path) +logger.info("init_standalone_env cmd:%s" % cmd) +run_command(cmd) + + +def get_host_list(slave_file): +hostlist = [] +with open(slave_file) as file: +data = file.readline() +if not(data == "" or data.startswith("#")): +hostlist.append(data) +return hostlist + + +def package(flink_home): +cmd = "cd %s; mvn clean install -B -U -DskipTests -Drat.skip=true -Dcheckstyle.skip=true " % flink_home +status, output = run_command(cmd) +if status and output.find("BUILD SUCCESS") > 0: +return True +else: +return False + + +def get_target(flink_home): +cmd = "ls -lt %s/flink-dist/target/flink-*-bin/ |grep -v tar.gz" +status, output = run_command(cmd) +if status: +target_file = output.split("\n")[0] +return target_file, "%s/flink-dist/target/%s-bin/%s" % (flink_home, target_file, target_file) +else: +return "", "" + + +def update_conf_slaves(dest_path, slave_file): +cmd = "cp %s %s/conf/" % (slave_file, dest_path) +run_command(cmd) + + +def init_env(): +flink_home = os.getcwd() Review comment: The returned value may not be a path of flink home. How about get the path of current file and then navigate to the flink home? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355157825 ## File path: tools/jenkins/run_case.py ## @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file will be runned by jenkis to run the e2e perf test +# Params: +# am_seserver_dddress: master machines's ip of standalone environment +# scenario_file: the file which contains test scenarios +# flink_home: the path of flink +# inter_nums:the num of every scenario's running, default value is 10 +# wait_minute: interval time of two elections of qps,default value is 10s +# + + +import sys +import time + +if sys.version_info < (3, 5): +print("Python versions prior to 3.5 are not supported.") +sys.exit(-1) + +from logger import logger +from utils import run_command +from restapi_common import get_avg_qps_by_restful_interface + + +def start_server(flink_home): +cmd = "bash %s/bin/start-cluster.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def end_server(flink_home): +cmd = "bash %s/bin/stop_yarn.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def get_scenarios(scenario_file_name, test_jar): +""" +parser file which contains serval scenarios,it's content likes this: Review comment: Parse file which contains several scenarios. Its content looks like the following examples. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355155587 ## File path: tools/jenkins/init_env.py ## @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Because of end-to-end-performance-test running in the cluster, so This init_env.py only contains flink on +# standalone env, doesn't contains the init environment of maven, java and jenkins. Defaultly maven, java and jenkins +# environment are ready in the cluster +# + +import os +from logger import logger +from utils import run_command +from restapi_common import execute_get + + +def init_standalone_env(host_list, user, source_path, dest_path): +for host in host_list: +cmd = "sudo su %s -c 'ssh %s \" rm -rf %s\"; scp -r %s %s@%s:%s'" % (user, host, dest_path, source_path, + user, host, dest_path) +logger.info("init_standalone_env cmd:%s" % cmd) +run_command(cmd) + + +def get_host_list(slave_file): +hostlist = [] +with open(slave_file) as file: +data = file.readline() +if not(data == "" or data.startswith("#")): +hostlist.append(data) +return hostlist + + +def package(flink_home): +cmd = "cd %s; mvn clean install -B -U -DskipTests -Drat.skip=true -Dcheckstyle.skip=true " % flink_home +status, output = run_command(cmd) +if status and output.find("BUILD SUCCESS") > 0: +return True +else: +return False + + +def get_target(flink_home): +cmd = "ls -lt %s/flink-dist/target/flink-*-bin/ |grep -v tar.gz" +status, output = run_command(cmd) +if status: +target_file = output.split("\n")[0] +return target_file, "%s/flink-dist/target/%s-bin/%s" % (flink_home, target_file, target_file) +else: +return "", "" + + +def update_conf_slaves(dest_path, slave_file): +cmd = "cp %s %s/conf/" % (slave_file, dest_path) +run_command(cmd) + + +def init_env(): +flink_home = os.getcwd() +package_result = package(flink_home) +if not package_result: +logger.error("package error") +return False +slave_file = "%s/tool/jenkins/slaves" % flink_home Review comment: It seems the script would fail as there are no `tool/jenkins/slaves` in the current flink home folder? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355157525 ## File path: tools/jenkins/restapi_common.py ## @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This common file writes the functions of getting qps from flink restful api +# +# Note by AiHua Li: + +import urllib +import urllib.request +import json +from logger import logger + + +def execute_get(url): +""" +Send a get request +:param url: +:return: content from the request +""" +try: +result = urllib.request.urlopen(url, timeout=300).read() +if result != "": +return json.loads(result) +else: +return {} +except Exception as e: +logger.error(e) +return {} + + +def get_source_node(plan): +""" +get the source nodes from job's json plan +:param plan: +:return: list of source nodes +""" +source_nodes = [] +nodes = plan.get("nodes", []) +for node in nodes: +inputs = node.get("inputs", []) +if len(inputs) == 0: +source_nodes.append(node["id"]) +return source_nodes + + +def get_avg_qps_by_restful_interface(am_seserver_dddress, job_id): +url = "http://%s/jobs/%s"; % (am_seserver_dddress, job_id) +result = execute_get(url) +vertices = result.get("vertices", "") +plan = result.get("plan", "") +source_nodes = get_source_node(plan) +totaltps = 0 +for vertice in vertices: +id = vertice.get("id", "") +if id in source_nodes: +url = "http://%s/jobs/%s/vertices/%s/subtasks/metrics?agg=avg"; % (am_seserver_dddress, job_id, id) +keyresult = execute_get(url) +for key in keyresult: +metrics_name = key.get("id", "") +if metrics_name.endswith("numBuffersOutPerSecond"): Review comment: Not sure whether should we use numBuffersOutPerSecond here. The buffer size would be changed? Should we use numRecordsOutPerSecond here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355156159 ## File path: tools/jenkins/init_env.py ## @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Because of end-to-end-performance-test running in the cluster, so This init_env.py only contains flink on +# standalone env, doesn't contains the init environment of maven, java and jenkins. Defaultly maven, java and jenkins +# environment are ready in the cluster +# + +import os +from logger import logger +from utils import run_command +from restapi_common import execute_get Review comment: Unused import This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355157613 ## File path: tools/jenkins/run_case.py ## @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file will be runned by jenkis to run the e2e perf test +# Params: +# am_seserver_dddress: master machines's ip of standalone environment +# scenario_file: the file which contains test scenarios +# flink_home: the path of flink +# inter_nums:the num of every scenario's running, default value is 10 +# wait_minute: interval time of two elections of qps,default value is 10s +# + + +import sys +import time + +if sys.version_info < (3, 5): +print("Python versions prior to 3.5 are not supported.") +sys.exit(-1) + +from logger import logger +from utils import run_command +from restapi_common import get_avg_qps_by_restful_interface + + +def start_server(flink_home): +cmd = "bash %s/bin/start-cluster.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def end_server(flink_home): +cmd = "bash %s/bin/stop_yarn.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def get_scenarios(scenario_file_name, test_jar): +""" +parser file which contains serval scenarios,it's content likes this: +classPath scenarioName jobparam1 jobparams2 +org.apache.Test testScenario1 aaa bbb +…… +:param scenario_file_name: scenario's file +:param test_jar: +:return: list of scenarios +""" +params_name = [] +scenarios = [] +scenario_names = [] +linenum = 0 +with open(scenario_file_name) as file: +data = file.readline() +if not (data.startswith("#") or data == ""): +linenum = linenum + 1 +cmd = "" +scenario_name = "" +if linenum == 1: +params_name = data.split(" ") +for index in range(0, len(params_name)): +params_name[index] = params_name[index] +if not "testClassPath" in params_name: +return 1, [] +else: +params_value = data.split(" ") +for index in range(0, len(params_name)): +param = params_name[index] +if param == "testClassPath": +cmd = "-c %s %s %s" % (params_value[index], test_jar, cmd) +else: +if param == "": +cmd = "--%s %s" % (param, params_value[index]) +else: +cmd = "%s --%s %s" % (cmd, param, params_value[index]) +scenario_name = "%s_%s" % (scenario_name, param) +scenario_names.append(scenario_name[1:]) +scenarios.append(cmd) +return 0, scenarios, scenario_names + + +def get_avg(values): +if len(values) == 0: +return 0.0 +else: +return sum(values) * 1.0 / len(values) + + +def run_cases(scenario_file_name, flink_home, am_seserver_dddress, inter_nums=10, wait_minute=10): +status, scenarios, scenario_names = get_scenarios(scenario_file_name) +for scenario_index in range(0, len(scenarios)): +scenario = scenarios.get(scenario_index) +scenario_name = scenario_names[scenario_index] +total_qps = [] +status = start_server(flink_home) +if not status: +logger.info("start server failed") +return 1 +for inter_index in range(0, inter_nums): +cmd = "bash %s/bin/flink run %s" % (flink_home, scenario) +status, output = run_command(cmd) +if status: +
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355156233 ## File path: tools/jenkins/init_env.py ## @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Because of end-to-end-performance-test running in the cluster, so This init_env.py only contains flink on +# standalone env, doesn't contains the init environment of maven, java and jenkins. Defaultly maven, java and jenkins +# environment are ready in the cluster +# + +import os +from logger import logger +from utils import run_command +from restapi_common import execute_get + + +def init_standalone_env(host_list, user, source_path, dest_path): +for host in host_list: +cmd = "sudo su %s -c 'ssh %s \" rm -rf %s\"; scp -r %s %s@%s:%s'" % (user, host, dest_path, source_path, + user, host, dest_path) +logger.info("init_standalone_env cmd:%s" % cmd) +run_command(cmd) + + +def get_host_list(slave_file): +hostlist = [] +with open(slave_file) as file: +data = file.readline() +if not(data == "" or data.startswith("#")): +hostlist.append(data) +return hostlist + + +def package(flink_home): +cmd = "cd %s; mvn clean install -B -U -DskipTests -Drat.skip=true -Dcheckstyle.skip=true " % flink_home +status, output = run_command(cmd) +if status and output.find("BUILD SUCCESS") > 0: +return True +else: +return False + + +def get_target(flink_home): +cmd = "ls -lt %s/flink-dist/target/flink-*-bin/ |grep -v tar.gz" +status, output = run_command(cmd) +if status: +target_file = output.split("\n")[0] +return target_file, "%s/flink-dist/target/%s-bin/%s" % (flink_home, target_file, target_file) +else: +return "", "" + + +def update_conf_slaves(dest_path, slave_file): +cmd = "cp %s %s/conf/" % (slave_file, dest_path) +run_command(cmd) + + +def init_env(): +flink_home = os.getcwd() +package_result = package(flink_home) +if not package_result: +logger.error("package error") +return False +slave_file = "%s/tool/jenkins/slaves" % flink_home +host_list = get_host_list(slave_file) +flink_path, source_path = get_target(flink_home) +dest_path = "/home/admin/%s" % flink_path Review comment: Maybe we should not hard code the path. All paths should base on a relative path according to the flink_home, so that we can run this script everywhere by everyone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355156382 ## File path: tools/jenkins/init_env.py ## @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Because of end-to-end-performance-test running in the cluster, so This init_env.py only contains flink on +# standalone env, doesn't contains the init environment of maven, java and jenkins. Defaultly maven, java and jenkins +# environment are ready in the cluster +# + +import os +from logger import logger +from utils import run_command +from restapi_common import execute_get + + +def init_standalone_env(host_list, user, source_path, dest_path): +for host in host_list: +cmd = "sudo su %s -c 'ssh %s \" rm -rf %s\"; scp -r %s %s@%s:%s'" % (user, host, dest_path, source_path, + user, host, dest_path) +logger.info("init_standalone_env cmd:%s" % cmd) +run_command(cmd) + + +def get_host_list(slave_file): +hostlist = [] +with open(slave_file) as file: +data = file.readline() +if not(data == "" or data.startswith("#")): +hostlist.append(data) +return hostlist + + +def package(flink_home): +cmd = "cd %s; mvn clean install -B -U -DskipTests -Drat.skip=true -Dcheckstyle.skip=true " % flink_home +status, output = run_command(cmd) +if status and output.find("BUILD SUCCESS") > 0: +return True +else: +return False + + +def get_target(flink_home): +cmd = "ls -lt %s/flink-dist/target/flink-*-bin/ |grep -v tar.gz" +status, output = run_command(cmd) +if status: +target_file = output.split("\n")[0] +return target_file, "%s/flink-dist/target/%s-bin/%s" % (flink_home, target_file, target_file) +else: +return "", "" + + +def update_conf_slaves(dest_path, slave_file): +cmd = "cp %s %s/conf/" % (slave_file, dest_path) +run_command(cmd) + + +def init_env(): +flink_home = os.getcwd() +package_result = package(flink_home) +if not package_result: +logger.error("package error") +return False +slave_file = "%s/tool/jenkins/slaves" % flink_home +host_list = get_host_list(slave_file) +flink_path, source_path = get_target(flink_home) +dest_path = "/home/admin/%s" % flink_path +if source_path != "": +update_conf_slaves(source_path, slave_file) +init_standalone_env(host_list, source_path, dest_path) Review comment: Not working here. The method contains 4 parameters while we pass 3 here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355158259 ## File path: tools/jenkins/run_case.py ## @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file will be runned by jenkis to run the e2e perf test +# Params: +# am_seserver_dddress: master machines's ip of standalone environment +# scenario_file: the file which contains test scenarios +# flink_home: the path of flink +# inter_nums:the num of every scenario's running, default value is 10 +# wait_minute: interval time of two elections of qps,default value is 10s +# + + +import sys +import time + +if sys.version_info < (3, 5): +print("Python versions prior to 3.5 are not supported.") +sys.exit(-1) + +from logger import logger +from utils import run_command +from restapi_common import get_avg_qps_by_restful_interface + + +def start_server(flink_home): +cmd = "bash %s/bin/start-cluster.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def end_server(flink_home): +cmd = "bash %s/bin/stop_yarn.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def get_scenarios(scenario_file_name, test_jar): +""" +parser file which contains serval scenarios,it's content likes this: +classPath scenarioName jobparam1 jobparams2 +org.apache.Test testScenario1 aaa bbb +…… +:param scenario_file_name: scenario's file +:param test_jar: +:return: list of scenarios +""" +params_name = [] +scenarios = [] +scenario_names = [] +linenum = 0 +with open(scenario_file_name) as file: +data = file.readline() +if not (data.startswith("#") or data == ""): +linenum = linenum + 1 +cmd = "" +scenario_name = "" +if linenum == 1: +params_name = data.split(" ") +for index in range(0, len(params_name)): +params_name[index] = params_name[index] +if not "testClassPath" in params_name: +return 1, [] +else: +params_value = data.split(" ") +for index in range(0, len(params_name)): +param = params_name[index] +if param == "testClassPath": +cmd = "-c %s %s %s" % (params_value[index], test_jar, cmd) +else: +if param == "": +cmd = "--%s %s" % (param, params_value[index]) +else: +cmd = "%s --%s %s" % (cmd, param, params_value[index]) +scenario_name = "%s_%s" % (scenario_name, param) +scenario_names.append(scenario_name[1:]) +scenarios.append(cmd) +return 0, scenarios, scenario_names + + +def get_avg(values): +if len(values) == 0: +return 0.0 +else: +return sum(values) * 1.0 / len(values) + + +def run_cases(scenario_file_name, flink_home, am_seserver_dddress, inter_nums=10, wait_minute=10): +status, scenarios, scenario_names = get_scenarios(scenario_file_name) +for scenario_index in range(0, len(scenarios)): +scenario = scenarios.get(scenario_index) +scenario_name = scenario_names[scenario_index] +total_qps = [] +status = start_server(flink_home) Review comment: Why start a cluster for each scenario? Can we start the cluster and reuse it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL abov
[GitHub] [flink] hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests
hequn8128 commented on a change in pull request #10436: [FLINK-14920] [flink-end-to-end-perf-tests] Set up environment to run performance e2e tests URL: https://github.com/apache/flink/pull/10436#discussion_r355158354 ## File path: tools/jenkins/run_case.py ## @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file will be runned by jenkis to run the e2e perf test +# Params: +# am_seserver_dddress: master machines's ip of standalone environment +# scenario_file: the file which contains test scenarios +# flink_home: the path of flink +# inter_nums:the num of every scenario's running, default value is 10 +# wait_minute: interval time of two elections of qps,default value is 10s +# + + +import sys +import time + +if sys.version_info < (3, 5): +print("Python versions prior to 3.5 are not supported.") +sys.exit(-1) + +from logger import logger +from utils import run_command +from restapi_common import get_avg_qps_by_restful_interface + + +def start_server(flink_home): +cmd = "bash %s/bin/start-cluster.sh" % flink_home +status, output = run_command(cmd) +if status and output.find("Exception") < 0: +return True +else: +return False + + +def end_server(flink_home): Review comment: This method has never been called. We should shut down the cluster correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu closed pull request #10476: [FLINK-14911] [table] register and drop temp catalog functions from D…
HuangZhenQiu closed pull request #10476: [FLINK-14911] [table] register and drop temp catalog functions from D… URL: https://github.com/apache/flink/pull/10476 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services