[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355276947 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -155,18 +155,22 @@ private void validateConnectionProperties(DescriptorProperties properties) { final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); final String[] hosts = hostsStr.split(";"); + final String validationExceptionMessage = "Properties '" + CONNECTOR_HOSTS + "' format should " + + "follow the format 'http://host_name:port', but is '" + hosts + "'."; Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355276918 ## File path: flink-python/pyflink/table/tests/test_descriptor.py ## @@ -115,10 +112,7 @@ def test_start_from_specific_offsets(self): properties = kafka.to_properties() expected = {'connector.startup-mode': 'specific-offsets', -'connector.specific-offsets.0.partition': '1', -'connector.specific-offsets.0.offset': '220', -'connector.specific-offsets.1.partition': '3', -'connector.specific-offsets.1.offset': '400', +'connector.specific-offsets.0.': 'partition:1,offset:220;partition:3,offset:400', Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355190453 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) { hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> properties.validateString(key, false, 1)); hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> properties.validateInt(key, false, 0, 65535)); hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> properties.validateString(key, false, 1)); - properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators); + + if (properties.containsKey(CONNECTOR_HOSTS)) { + validateAndGetHostsStr(properties); + } else { + properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355190442 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java ## @@ -200,15 +201,21 @@ private DescriptorProperties getValidatedProperties(Map properti } private List getHosts(DescriptorProperties descriptorProperties) { - final List> hosts = descriptorProperties.getFixedIndexedProperties( - CONNECTOR_HOSTS, - Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL)); - return hosts.stream() - .map(host -> new Host( - descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)), - descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)), - descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL - .collect(Collectors.toList()); + final List hostList = new ArrayList<>(); + if (descriptorProperties.containsKey(CONNECTOR_HOSTS)) { + return validateAndGetHostsStr(descriptorProperties); + } else { + final List> hosts = descriptorProperties.getFixedIndexedProperties( + CONNECTOR_HOSTS, + Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL)); + hosts.stream() + .forEach(host -> hostList.add(new Host( + descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)), + descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)), + descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))) + )); + } + return hostList; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355190188 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (null == hostsStr || hostsStr.length() == 0) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr); + } + + final String[] hosts = hostsStr.split(";"); Review comment: It's really a good point that I missed before, thank you very much ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355190196 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (hostsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty."); + } + + final String[] hosts = hostsStr.split(";"); + for (String host : hosts) { + try { + final URL url = new URL(host); + final String protocol = url.getProtocol(); + final String hostNmae = url.getHost(); + final int hostPort = url.getPort(); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355190040 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -146,28 +147,32 @@ private void validateConnectionProperties(DescriptorProperties properties) { * * connector.hosts = http://host_name:9092;http://host_name:9093 * -* @param descriptorProperties -* @return */ - public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { - final List hostList = new ArrayList<>(); + public static List validateAndParseHostsString(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + descriptorProperties.validateString(CONNECTOR_HOSTS, false, 1); final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); - if (hostsStr.isEmpty()) { - throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty."); - } final String[] hosts = hostsStr.split(";"); for (String host : hosts) { + final String validationExceptionMessage = "Properties '" + CONNECTOR_HOSTS + "' format should " + + "follow the format 'http://host_name:port', but is '" + host + "'."; try { final URL url = new URL(host); final String protocol = url.getProtocol(); - final String hostNmae = url.getHost(); + final String hostName = url.getHost(); final int hostPort = url.getPort(); - hostList.add(new ElasticsearchUpsertTableSinkBase.Host(hostNmae, hostPort, protocol)); + + if (null == protocol || protocol.isEmpty() Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355162114 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (null == hostsStr || hostsStr.length() == 0) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr); + } + + final String[] hosts = hostsStr.split(";"); Review comment: `descriptorProperties.getString` can ensure hostsStr not empty, `hosts.size > 0` is always true here, Should we validate it again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355162022 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java ## @@ -177,13 +157,112 @@ public void testTableSource() { assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass())); } + @Test + @SuppressWarnings("unchecked") + public void testTableSourceWithLegacyProperties() { + // prepare parameters for Kafka table source + final TableSchema schema = TableSchema.builder() + .field(FRUIT_NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) + .field(PROC_TIME, DataTypes.TIMESTAMP(3)) + .build(); + + final List rowtimeAttributeDescriptors = Collections.singletonList( + new RowtimeAttributeDescriptor(EVENT_TIME, new ExistingField(TIME), new AscendingTimestamps())); + + final Map fieldMapping = new HashMap<>(); + fieldMapping.put(FRUIT_NAME, NAME); + fieldMapping.put(NAME, NAME); + fieldMapping.put(COUNT, COUNT); + fieldMapping.put(TIME, TIME); + + final Map specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + + final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( + TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .build() + .toRowType() + ); + + final KafkaTableSourceBase expected = getExpectedKafkaTableSource( + schema, + Optional.of(PROC_TIME), + rowtimeAttributeDescriptors, + fieldMapping, + TOPIC, + KAFKA_PROPERTIES, + deserializationSchema, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets); + + TableSourceValidation.validateTableSource(expected); + + // construct table source using descriptors and table source factory + final Map legacyPropertiesMap = new HashMap<>(); + legacyPropertiesMap.putAll(createKafkaSourceProperties()); + + // use legacy properties + legacyPropertiesMap.remove("connector.specific-offsets"); + legacyPropertiesMap.remove("connector.properties.zookeeper.connect"); + legacyPropertiesMap.remove("connector.properties.bootstrap.servers"); + legacyPropertiesMap.remove("connector.properties.group.id"); + + legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0"); + legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100"); + legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1"); + legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123"); + legacyPropertiesMap.put("connector.properties.0.key", "zookeeper.connect"); + legacyPropertiesMap.put("connector.properties.0.value", "dummy"); + legacyPropertiesMap.put("connector.properties.1.key", "bootstrap.servers"); + legacyPropertiesMap.put("connector.properties.1.value", "dummy"); + legacyPropertiesMap.put("connector.properties.2.key", "group.id"); + legacyPropertiesMap.put("connector.properties.2.value", "dummy"); + + final TableSource actualSource = TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap) + .createStreamTableSource(legacyPropertiesMap); + + assertEquals(expected, actualSource); + + // test Kafka consumer + final KafkaTableSourceBase actualKafkaSource = (KafkaTableSourceBase) actualSource; + final StreamExecutionEnvironmentMock mock
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161955 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return SpecificOffsets with map format, key is partition, and value is offset. +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { + final Map offsetMap = new HashMap<>(); + + final String parseSpecificOffsetsStr = descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS); + if (parseSpecificOffsetsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_SPECIFIC_OFFSETS + "' can not be empty."); + } Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161941 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ## @@ -265,24 +259,22 @@ public Kafka sinkPartitionerCustom(Class partit } if (specificOffsets != null) { - final List> values = new ArrayList<>(); + final StringBuilder stringBuilder = new StringBuilder(); + int i = 0; for (Map.Entry specificOffset : specificOffsets.entrySet()) { - values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); + if (i != 0) { + stringBuilder.append(';'); + } + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(','); + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue()); + i++; } - properties.putIndexedFixedProperties( - CONNECTOR_SPECIFIC_OFFSETS, - Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET), - values); + properties.putString(CONNECTOR_SPECIFIC_OFFSETS, stringBuilder.toString()); } if (kafkaProperties != null) { - properties.putIndexedFixedProperties( - CONNECTOR_PROPERTIES, - Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE), - this.kafkaProperties.entrySet().stream() - .map(e -> Arrays.asList(e.getKey(), e.getValue())) - .collect(Collectors.toList()) - ); + this.kafkaProperties.entrySet().forEach(entry -> + properties.putString(CONNECTOR_PROPERTIES + '.' + entry.getKey(), entry.getValue())); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161944 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161898 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (hostsStr.isEmpty()) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty."); + } Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161876 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: ok ,I thnink validateAndParseHostsString will be better This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161876 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: ok ,I think validateAndParseHostsString will be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161864 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: ok, I think validateAndGetHosts will be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161869 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return SpecificOffsets with map format, key is partition, and value is offset. +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161864 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { Review comment: ok, I think validateAndGetHosts will be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161822 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161802 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java ## @@ -308,13 +304,7 @@ public Elasticsearch connectionPathPrefix(String pathPrefix) { final DescriptorProperties properties = new DescriptorProperties(); properties.putProperties(internalProperties); - final List> hostValues = hosts.stream() - .map(host -> Arrays.asList(host.hostname, String.valueOf(host.port), host.protocol)) - .collect(Collectors.toList()); - properties.putIndexedFixedProperties( - CONNECTOR_HOSTS, - Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL), - hostValues); + properties.putString(CONNECTOR_HOSTS, hosts.stream().map(Host::toString).collect(Collectors.joining(";"))); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161756 ## File path: docs/dev/table/connect.md ## @@ -247,10 +247,8 @@ tables: topic: test-input startup-mode: earliest-offset properties: -- key: zookeeper.connect - value: localhost:2181 -- key: bootstrap.servers - value: localhost:9092 +zookeeper.connect: localhost:2181,localhost:2182 Review comment: but in FLIP 86, we defined an example as following: `'connector.properties.zookeeper.connect'='localhost:2181,localhost:2182', 'connector.properties.bootstrap.servers'='localhost:9092,localhost:9093', 'connector.properties.group.id'='testGroup' ` so I think keep this or use 'hostname1:2181,host_name2:2181' can make sense both. @JingsongLi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161684 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) { hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> properties.validateString(key, false, 1)); hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> properties.validateInt(key, false, 0, 65535)); hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> properties.validateString(key, false, 1)); - properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators); + + if (properties.containsKey(CONNECTOR_HOSTS)) { + validateAndGetHostsStr(properties); Review comment: we'd better parse and validate at the same time because `DescriptorProperties` do not support `Consumer` that can validate `String` value like `partition:0,offset:42;partition:1,offset:300` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161099 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ## @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit } if (specificOffsets != null) { - final List> values = new ArrayList<>(); + final StringBuilder stringBuilder = new StringBuilder(); + int i = 0; for (Map.Entry specificOffset : specificOffsets.entrySet()) { - values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); + if (i != 0) { + stringBuilder.append(';'); + } + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(','); + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue()); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355161074 ## File path: docs/dev/table/connect.md ## @@ -247,10 +247,8 @@ tables: topic: test-input startup-mode: earliest-offset properties: -- key: zookeeper.connect - value: localhost:2181 -- key: bootstrap.servers - value: localhost:9092 +zookeeper.connect: localhost:2181,localhost:2182 Review comment: Just to specify multi hosts format in a string here, maybe 'hostname1:2181,host_name2:2181' will better, how do you think of ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355155261 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { + final Map offsetMap = new HashMap<>(); + + final String parseSpecificOffsetsStr = descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS); + if (null == parseSpecificOffsetsStr || parseSpecificOffsetsStr.length() == 0) { + throw new ValidationException("Properties connector.specific-offsets can not be empty, but is:" + parseSpecificOffsetsStr); + } + + final String[] pairs = parseSpecificOffsetsStr.split(";"); + for (String pair: pairs) { + if (null == pair || pair.length() == 0 || !pair.contains(",")) { + throw new ValidationException("Invalid properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" + + "should in the format 'partition:0,offset:42;partition:1,offset:300', " + + "but is '" + parseSpecificOffsetsStr + "'."); } + + final String[] kv = pair.split(","); + if (kv.length != 2 || + !kv[0].startsWith(CONNECTOR_SPECIFIC_OFFSETS_PARTITION + ':') || + !kv[1].startsWith(CONNECTOR_SPECIFIC_OFFSETS_OFFSET + ':')) { + throw new ValidationException("Invalid properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" + + "should in the format 'partition:0,offset:42;partition:1,offset:300', " + + "but is '" + parseSpecificOffsetsStr + "'."); + } + + String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1); + String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1); + try { + final Integer parttion = Integer.valueOf(partitionValue); + final Long offset = Long.valueOf(offsetValue); + offsetMap.put(parttion, offset); + } + catch (NumberFormatException e) { Review comment: yes, i got it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355155224 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (null == hostsStr || hostsStr.length() == 0) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355155227 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (null == hostsStr || hostsStr.length() == 0) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr); + } + + final String[] hosts = hostsStr.split(";"); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355128298 ## File path: docs/dev/table/connect.md ## @@ -247,10 +247,8 @@ tables: topic: test-input startup-mode: earliest-offset properties: -- key: zookeeper.connect - value: localhost:2181 -- key: bootstrap.servers - value: localhost:9092 +zookeeper.connect: localhost:2181,localhost:2182 Review comment: @JingsongLi thanks for your review very much. I addressed your left comments,could you have a more look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355127382 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -322,6 +333,26 @@ private StartupOptions getStartupOptions( return options; } + private void buildSpecificOffsets(DescriptorProperties descriptorProperties, String topic, Map specificOffsets) { + if (descriptorProperties.containsKey(CONNECTOR_SPECIFIC_OFFSETS)) { + final Map offsetMap = KafkaValidator.validateAndGetSpecificOffsetsStr(descriptorProperties); + offsetMap.forEach((partition, offset) -> { + final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition); + specificOffsets.put(topicPartition, offset); + }); + } else { + final List> offsetList = descriptorProperties.getFixedIndexedProperties( + CONNECTOR_SPECIFIC_OFFSETS, + Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET)); + offsetList.forEach(kv -> { + final int partition = descriptorProperties.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION)); Review comment: this pice come from original code and I think we'd better to keep final This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355126336 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java ## @@ -227,6 +296,74 @@ public void testTableSink() { assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass())); } + @Test + public void testTableSinkWithLegacyProperties() { + // prepare parameters for Kafka table sink + Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355126322 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { + final Map offsetMap = new HashMap<>(); + + final String parseSpecificOffsetsStr = descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS); + if (null == parseSpecificOffsetsStr || parseSpecificOffsetsStr.length() == 0) { + throw new ValidationException("Properties connector.specific-offsets can not be empty, but is:" + parseSpecificOffsetsStr); + } + + final String[] pairs = parseSpecificOffsetsStr.split(";"); + for (String pair: pairs) { + if (null == pair || pair.length() == 0 || !pair.contains(",")) { + throw new ValidationException("Invalid properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" + + "should in the format 'partition:0,offset:42;partition:1,offset:300', " + + "but is '" + parseSpecificOffsetsStr + "'."); } + + final String[] kv = pair.split(","); + if (kv.length != 2 || + !kv[0].startsWith(CONNECTOR_SPECIFIC_OFFSETS_PARTITION + ':') || + !kv[1].startsWith(CONNECTOR_SPECIFIC_OFFSETS_OFFSET + ':')) { + throw new ValidationException("Invalid properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" + + "should in the format 'partition:0,offset:42;partition:1,offset:300', " + + "but is '" + parseSpecificOffsetsStr + "'."); + } + + String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1); + String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1); + try { + final Integer parttion = Integer.valueOf(partitionValue); + final Long offset = Long.valueOf(offsetValue); + offsetMap.put(parttion, offset); + } + catch (NumberFormatException e) { Review comment: `Long.valueOf()` and `Integer.valueOf` may throw a NumberFormatException This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355126281 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { + final Map offsetMap = new HashMap<>(); + + final String parseSpecificOffsetsStr = descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS); + if (null == parseSpecificOffsetsStr || parseSpecificOffsetsStr.length() == 0) { + throw new ValidationException("Properties connector.specific-offsets can not be empty, but is:" + parseSpecificOffsetsStr); + } + + final String[] pairs = parseSpecificOffsetsStr.split(";"); + for (String pair: pairs) { Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355126249 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { + final Map offsetMap = new HashMap<>(); + + final String parseSpecificOffsetsStr = descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS); + if (null == parseSpecificOffsetsStr || parseSpecificOffsetsStr.length() == 0) { Review comment: Nice, `descriptorProperties.getString` has dealed null value This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355126175 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return +*/ + public static Map validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) { + final Map offsetMap = new HashMap<>(); + + final String parseSpecificOffsetsStr = descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS); + if (null == parseSpecificOffsetsStr || parseSpecificOffsetsStr.length() == 0) { + throw new ValidationException("Properties connector.specific-offsets can not be empty, but is:" + parseSpecificOffsetsStr); + } + + final String[] pairs = parseSpecificOffsetsStr.split(";"); + for (String pair: pairs) { + if (null == pair || pair.length() == 0 || !pair.contains(",")) { + throw new ValidationException("Invalid properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" + Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355126132 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode startupMode) { } throw new IllegalArgumentException("Invalid startup mode."); } + + /** +* Parse SpecificOffsets String to Map. +* +* SpecificOffsets String format was given as following: +* +* +* connector.specific-offsets = partition:0,offset:42;partition:1,offset:300 +* +* @param descriptorProperties +* @return Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355126129 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -92,21 +96,36 @@ private void validateStartupMode(DescriptorProperties properties) { startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS, noValidation()); startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_EARLIEST, noValidation()); startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_LATEST, noValidation()); - startupModeValidation.put( - CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, - key -> properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, specificOffsetValidators)); + + if (properties.containsKey(CONNECTOR_SPECIFIC_OFFSETS)) { + validateAndGetSpecificOffsetsStr(properties); Review comment: the same reason above hostsStr validation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355126077 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ## @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit } if (specificOffsets != null) { - final List> values = new ArrayList<>(); + final StringBuilder stringBuilder = new StringBuilder(); + int i = 0; for (Map.Entry specificOffset : specificOffsets.entrySet()) { - values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); + if (i != 0) { + stringBuilder.append(';'); + } + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(','); + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue()); + i++; } - properties.putIndexedFixedProperties( - CONNECTOR_SPECIFIC_OFFSETS, - Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET), - values); + properties.putString(CONNECTOR_SPECIFIC_OFFSETS, stringBuilder.toString()); } if (kafkaProperties != null) { - properties.putIndexedFixedProperties( - CONNECTOR_PROPERTIES, - Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE), - this.kafkaProperties.entrySet().stream() - .map(e -> Arrays.asList(e.getKey(), e.getValue())) - .collect(Collectors.toList()) - ); + this.kafkaProperties.entrySet() + .forEach( Review comment: like this `this.kafkaProperties.entrySet().forEach(entry -> ` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355125908 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ## @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit } if (specificOffsets != null) { - final List> values = new ArrayList<>(); + final StringBuilder stringBuilder = new StringBuilder(); + int i = 0; for (Map.Entry specificOffset : specificOffsets.entrySet()) { - values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); + if (i != 0) { + stringBuilder.append(';'); + } + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(','); + stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue()); Review comment: you mean do not use stringBuilder here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355125769 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -274,13 +279,27 @@ private DescriptorProperties getValidatedProperties(Map properti private Properties getKafkaProperties(DescriptorProperties descriptorProperties) { final Properties kafkaProperties = new Properties(); - final List> propsList = descriptorProperties.getFixedIndexedProperties( - CONNECTOR_PROPERTIES, - Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE)); - propsList.forEach(kv -> kafkaProperties.put( - descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)), - descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE)) - )); + + if (descriptorProperties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT) + || descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER) + || descriptorProperties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID)) { + descriptorProperties.asMap().keySet() + .stream() + .filter(key -> key.startsWith(CONNECTOR_PROPERTIES)) + .forEach(key -> { + final String value = descriptorProperties.getString(key); + final String subKey = key.replaceFirst(CONNECTOR_PROPERTIES + '.', ""); Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355125709 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -274,13 +279,27 @@ private DescriptorProperties getValidatedProperties(Map properti private Properties getKafkaProperties(DescriptorProperties descriptorProperties) { final Properties kafkaProperties = new Properties(); - final List> propsList = descriptorProperties.getFixedIndexedProperties( - CONNECTOR_PROPERTIES, - Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE)); - propsList.forEach(kv -> kafkaProperties.put( - descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)), - descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE)) - )); + + if (descriptorProperties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT) Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355125665 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) { hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> properties.validateString(key, false, 1)); hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> properties.validateInt(key, false, 0, 65535)); hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> properties.validateString(key, false, 1)); - properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators); + + if (properties.containsKey(CONNECTOR_HOSTS)) { + validateAndGetHostsStr(properties); Review comment: we need to parse hostsStr before validation when use new properties like : `'connector.specific-offsets'='partition:0,offset:42;partition:1,offset:300'` and the parse logic only used in ElasticSearch, so I add this function here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355125677 ## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java ## @@ -126,4 +137,39 @@ private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); } + + /** +* Parse Hosts String to list. +* +* Hosts String format was given as following: +* +* +* connector.hosts = http://host_name:9092;http://host_name:9093 +* +* @param descriptorProperties +* @return +*/ + public static List validateAndGetHostsStr(DescriptorProperties descriptorProperties) { + final List hostList = new ArrayList<>(); + + final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS); + if (null == hostsStr || hostsStr.length() == 0) { + throw new ValidationException("Properties '" + CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr); + } + + final String[] hosts = hostsStr.split(";"); + for (String host: hosts) { Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL
leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL URL: https://github.com/apache/flink/pull/10468#discussion_r355125460 ## File path: docs/dev/table/connect.md ## @@ -247,10 +247,8 @@ tables: topic: test-input startup-mode: earliest-offset properties: -- key: zookeeper.connect - value: localhost:2181 -- key: bootstrap.servers - value: localhost:9092 +zookeeper.connect: localhost:2181,localhost:2182 Review comment: specific multi hosts format here, no repeat,they have different port This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services