[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-08 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355276947
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -155,18 +155,22 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
 
final String[] hosts = hostsStr.split(";");
+   final String validationExceptionMessage = "Properties '" + 
CONNECTOR_HOSTS + "' format should " +
+   "follow the format 'http://host_name:port', but is '" + 
hosts + "'.";
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-08 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355276918
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_descriptor.py
 ##
 @@ -115,10 +112,7 @@ def test_start_from_specific_offsets(self):
 
 properties = kafka.to_properties()
 expected = {'connector.startup-mode': 'specific-offsets',
-'connector.specific-offsets.0.partition': '1',
-'connector.specific-offsets.0.offset': '220',
-'connector.specific-offsets.1.partition': '3',
-'connector.specific-offsets.1.offset': '400',
+'connector.specific-offsets.0.': 
'partition:1,offset:220;partition:3,offset:400',
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-08 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355190453
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) 
{
hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> 
properties.validateString(key, false, 1));
hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> 
properties.validateInt(key, false, 0, 65535));
hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> 
properties.validateString(key, false, 1));
-   properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, 
false, hostsValidators);
+
+   if (properties.containsKey(CONNECTOR_HOSTS)) {
+   validateAndGetHostsStr(properties);
+   } else {
+   
properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, 
hostsValidators);
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-08 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355190442
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
 ##
 @@ -200,15 +201,21 @@ private DescriptorProperties 
getValidatedProperties(Map properti
}
 
private List getHosts(DescriptorProperties descriptorProperties) {
-   final List> hosts = 
descriptorProperties.getFixedIndexedProperties(
-   CONNECTOR_HOSTS,
-   Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, 
CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
-   return hosts.stream()
-   .map(host -> new Host(
-   
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
-   
descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
-   
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL
-   .collect(Collectors.toList());
+   final List hostList = new ArrayList<>();
+   if (descriptorProperties.containsKey(CONNECTOR_HOSTS)) {
+   return validateAndGetHostsStr(descriptorProperties);
+   } else {
+   final List> hosts = 
descriptorProperties.getFixedIndexedProperties(
+   CONNECTOR_HOSTS,
+   Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, 
CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
+   hosts.stream()
+   .forEach(host -> hostList.add(new Host(
+   
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
+   
descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
+   
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL)))
+   ));
+   }
+   return hostList;
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-08 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355190188
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (null == hostsStr || hostsStr.length() == 0) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr);
+   }
+
+   final String[] hosts = hostsStr.split(";");
 
 Review comment:
   It's really a good point that I missed before, thank you very much ! 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-08 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355190196
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (hostsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty.");
+   }
+
+   final String[] hosts = hostsStr.split(";");
+   for (String host : hosts) {
+   try {
+   final URL url = new URL(host);
+   final String protocol = url.getProtocol();
+   final String hostNmae = url.getHost();
+   final int hostPort = url.getPort();
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-08 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355190040
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -146,28 +147,32 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
 * 
 * connector.hosts = http://host_name:9092;http://host_name:9093
 * 
-* @param descriptorProperties
-* @return
 */
-   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
-   final List hostList = 
new ArrayList<>();
+   public static List 
validateAndParseHostsString(DescriptorProperties descriptorProperties) {
+   final List hostList = new ArrayList<>();
 
+   descriptorProperties.validateString(CONNECTOR_HOSTS, false, 1);
final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
-   if (hostsStr.isEmpty()) {
-   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty.");
-   }
 
final String[] hosts = hostsStr.split(";");
for (String host : hosts) {
+   final String validationExceptionMessage = "Properties 
'" + CONNECTOR_HOSTS + "' format should " +
+   "follow the format 
'http://host_name:port', but is '" + host + "'.";
try {
final URL url = new URL(host);
final String protocol = url.getProtocol();
-   final String hostNmae = url.getHost();
+   final String hostName = url.getHost();
final int hostPort = url.getPort();
-   hostList.add(new 
ElasticsearchUpsertTableSinkBase.Host(hostNmae, hostPort, protocol));
+
+   if (null == protocol || protocol.isEmpty()
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355162114
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (null == hostsStr || hostsStr.length() == 0) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr);
+   }
+
+   final String[] hosts = hostsStr.split(";");
 
 Review comment:
   `descriptorProperties.getString` can ensure hostsStr not empty,  `hosts.size 
> 0` is always true here, Should we validate it again?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355162022
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 ##
 @@ -177,13 +157,112 @@ public void testTableSource() {

assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
}
 
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testTableSourceWithLegacyProperties() {
+   // prepare parameters for Kafka table source
+   final TableSchema schema = TableSchema.builder()
+   .field(FRUIT_NAME, DataTypes.STRING())
+   .field(COUNT, DataTypes.DECIMAL(10, 3))
+   .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
+   .field(PROC_TIME, DataTypes.TIMESTAMP(3))
+   .build();
+
+   final List 
rowtimeAttributeDescriptors = Collections.singletonList(
+   new RowtimeAttributeDescriptor(EVENT_TIME, new 
ExistingField(TIME), new AscendingTimestamps()));
+
+   final Map fieldMapping = new HashMap<>();
+   fieldMapping.put(FRUIT_NAME, NAME);
+   fieldMapping.put(NAME, NAME);
+   fieldMapping.put(COUNT, COUNT);
+   fieldMapping.put(TIME, TIME);
+
+   final Map specificOffsets = new 
HashMap<>();
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
+
+   final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
+   TableSchema.builder()
+   .field(NAME, DataTypes.STRING())
+   .field(COUNT, 
DataTypes.DECIMAL(10, 3))
+   .field(TIME, 
DataTypes.TIMESTAMP(3))
+   .build()
+   .toRowType()
+   );
+
+   final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
+   schema,
+   Optional.of(PROC_TIME),
+   rowtimeAttributeDescriptors,
+   fieldMapping,
+   TOPIC,
+   KAFKA_PROPERTIES,
+   deserializationSchema,
+   StartupMode.SPECIFIC_OFFSETS,
+   specificOffsets);
+
+   TableSourceValidation.validateTableSource(expected);
+
+   // construct table source using descriptors and table source 
factory
+   final Map legacyPropertiesMap = new HashMap<>();
+   legacyPropertiesMap.putAll(createKafkaSourceProperties());
+
+   // use legacy properties
+   legacyPropertiesMap.remove("connector.specific-offsets");
+   
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
+   
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
+   legacyPropertiesMap.remove("connector.properties.group.id");
+
+   
legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0");
+   legacyPropertiesMap.put("connector.specific-offsets.0.offset", 
"100");
+   
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
+   legacyPropertiesMap.put("connector.specific-offsets.1.offset", 
"123");
+   legacyPropertiesMap.put("connector.properties.0.key", 
"zookeeper.connect");
+   legacyPropertiesMap.put("connector.properties.0.value", 
"dummy");
+   legacyPropertiesMap.put("connector.properties.1.key", 
"bootstrap.servers");
+   legacyPropertiesMap.put("connector.properties.1.value", 
"dummy");
+   legacyPropertiesMap.put("connector.properties.2.key", 
"group.id");
+   legacyPropertiesMap.put("connector.properties.2.value", 
"dummy");
+
+   final TableSource actualSource = 
TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
+   .createStreamTableSource(legacyPropertiesMap);
+
+   assertEquals(expected, actualSource);
+
+   // test Kafka consumer
+   final KafkaTableSourceBase actualKafkaSource = 
(KafkaTableSourceBase) actualSource;
+   final StreamExecutionEnvironmentMock mock

[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161955
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return SpecificOffsets with map format, key is partition, and value 
is offset.
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
+   final Map offsetMap = new HashMap<>();
+
+   final String parseSpecificOffsetsStr = 
descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS);
+   if (parseSpecificOffsetsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_SPECIFIC_OFFSETS + "' can not be empty.");
+   }
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161941
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 ##
 @@ -265,24 +259,22 @@ public Kafka sinkPartitionerCustom(Class partit
}
 
if (specificOffsets != null) {
-   final List> values = new ArrayList<>();
+   final StringBuilder stringBuilder = new StringBuilder();
+   int i = 0;
for (Map.Entry specificOffset : 
specificOffsets.entrySet()) {
-   
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
+   if (i != 0) {
+   stringBuilder.append(';');
+   }
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(',');
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue());
+   i++;
}
-   properties.putIndexedFixedProperties(
-   CONNECTOR_SPECIFIC_OFFSETS,
-   
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET),
-   values);
+   properties.putString(CONNECTOR_SPECIFIC_OFFSETS, 
stringBuilder.toString());
}
 
if (kafkaProperties != null) {
-   properties.putIndexedFixedProperties(
-   CONNECTOR_PROPERTIES,
-   Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE),
-   this.kafkaProperties.entrySet().stream()
-   .map(e -> Arrays.asList(e.getKey(), 
e.getValue()))
-   .collect(Collectors.toList())
-   );
+   this.kafkaProperties.entrySet().forEach(entry ->
+   
properties.putString(CONNECTOR_PROPERTIES + '.' + entry.getKey(), 
entry.getValue()));
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161944
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161898
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (hostsStr.isEmpty()) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty.");
+   }
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161876
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok ,I thnink validateAndParseHostsString will be better


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161876
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok ,I think validateAndParseHostsString will be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161864
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok, I think validateAndGetHosts will be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161869
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,52 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return SpecificOffsets with map format, key is partition, and value 
is offset.
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161864
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
 
 Review comment:
   ok, I think validateAndGetHosts will be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161822
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161802
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java
 ##
 @@ -308,13 +304,7 @@ public Elasticsearch connectionPathPrefix(String 
pathPrefix) {
final DescriptorProperties properties = new 
DescriptorProperties();
properties.putProperties(internalProperties);
 
-   final List> hostValues = hosts.stream()
-   .map(host -> Arrays.asList(host.hostname, 
String.valueOf(host.port), host.protocol))
-   .collect(Collectors.toList());
-   properties.putIndexedFixedProperties(
-   CONNECTOR_HOSTS,
-   Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, 
CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL),
-   hostValues);
+   properties.putString(CONNECTOR_HOSTS, 
hosts.stream().map(Host::toString).collect(Collectors.joining(";")));
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161756
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -247,10 +247,8 @@ tables:
   topic: test-input
   startup-mode: earliest-offset
   properties:
-- key: zookeeper.connect
-  value: localhost:2181
-- key: bootstrap.servers
-  value: localhost:9092
+zookeeper.connect: localhost:2181,localhost:2182
 
 Review comment:
   but in FLIP 86, we defined an example as following:
   `'connector.properties.zookeeper.connect'='localhost:2181,localhost:2182',
   'connector.properties.bootstrap.servers'='localhost:9092,localhost:9093',
   'connector.properties.group.id'='testGroup'
   `
   so I think keep this or use 'hostname1:2181,host_name2:2181' can make sense 
both. @JingsongLi 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161684
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) 
{
hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> 
properties.validateString(key, false, 1));
hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> 
properties.validateInt(key, false, 0, 65535));
hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> 
properties.validateString(key, false, 1));
-   properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, 
false, hostsValidators);
+
+   if (properties.containsKey(CONNECTOR_HOSTS)) {
+   validateAndGetHostsStr(properties);
 
 Review comment:
   we'd better parse and validate at the same time because 
`DescriptorProperties` do not support `Consumer` that can validate `String` 
value  like `partition:0,offset:42;partition:1,offset:300`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161099
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 ##
 @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit
}
 
if (specificOffsets != null) {
-   final List> values = new ArrayList<>();
+   final StringBuilder stringBuilder = new StringBuilder();
+   int i = 0;
for (Map.Entry specificOffset : 
specificOffsets.entrySet()) {
-   
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
+   if (i != 0) {
+   stringBuilder.append(';');
+   }
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(',');
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue());
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355161074
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -247,10 +247,8 @@ tables:
   topic: test-input
   startup-mode: earliest-offset
   properties:
-- key: zookeeper.connect
-  value: localhost:2181
-- key: bootstrap.servers
-  value: localhost:9092
+zookeeper.connect: localhost:2181,localhost:2182
 
 Review comment:
   Just to specify multi hosts format in a string here,
maybe 'hostname1:2181,host_name2:2181' will better, how do you think of ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355155261
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
+   final Map offsetMap = new HashMap<>();
+
+   final String parseSpecificOffsetsStr = 
descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS);
+   if (null == parseSpecificOffsetsStr || 
parseSpecificOffsetsStr.length() == 0) {
+   throw new ValidationException("Properties 
connector.specific-offsets can not be empty, but is:" + 
parseSpecificOffsetsStr);
+   }
+
+   final String[] pairs = parseSpecificOffsetsStr.split(";");
+   for (String pair: pairs) {
+   if (null == pair || pair.length() == 0 || 
!pair.contains(",")) {
+   throw new ValidationException("Invalid 
properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" +
+   "should in the format 
'partition:0,offset:42;partition:1,offset:300', " +
+   "but is '" + 
parseSpecificOffsetsStr + "'.");   }
+
+   final String[] kv = pair.split(",");
+   if (kv.length != 2 ||
+   
!kv[0].startsWith(CONNECTOR_SPECIFIC_OFFSETS_PARTITION + ':') ||
+   
!kv[1].startsWith(CONNECTOR_SPECIFIC_OFFSETS_OFFSET + ':')) {
+   throw new ValidationException("Invalid 
properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" +
+   "should in the format 
'partition:0,offset:42;partition:1,offset:300', " +
+   "but is '" + 
parseSpecificOffsetsStr + "'.");
+   }
+
+   String partitionValue = 
kv[0].substring(kv[0].indexOf(":") + 1);
+   String offsetValue = kv[1].substring(kv[1].indexOf(":") 
+ 1);
+   try {
+   final Integer parttion = 
Integer.valueOf(partitionValue);
+   final Long offset = Long.valueOf(offsetValue);
+   offsetMap.put(parttion, offset);
+   }
+   catch (NumberFormatException e) {
 
 Review comment:
   yes, i got it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355155224
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (null == hostsStr || hostsStr.length() == 0) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr);
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355155227
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (null == hostsStr || hostsStr.length() == 0) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr);
+   }
+
+   final String[] hosts = hostsStr.split(";");
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355128298
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -247,10 +247,8 @@ tables:
   topic: test-input
   startup-mode: earliest-offset
   properties:
-- key: zookeeper.connect
-  value: localhost:2181
-- key: bootstrap.servers
-  value: localhost:9092
+zookeeper.connect: localhost:2181,localhost:2182
 
 Review comment:
   @JingsongLi thanks for your review very much.
   I addressed your left comments,could you have a more look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355127382
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 ##
 @@ -322,6 +333,26 @@ private StartupOptions getStartupOptions(
return options;
}
 
+   private void buildSpecificOffsets(DescriptorProperties 
descriptorProperties, String topic, Map 
specificOffsets) {
+   if 
(descriptorProperties.containsKey(CONNECTOR_SPECIFIC_OFFSETS)) {
+   final Map offsetMap = 
KafkaValidator.validateAndGetSpecificOffsetsStr(descriptorProperties);
+   offsetMap.forEach((partition, offset) -> {
+   final KafkaTopicPartition topicPartition = new 
KafkaTopicPartition(topic, partition);
+   specificOffsets.put(topicPartition, offset);
+   });
+   } else {
+   final List> offsetList = 
descriptorProperties.getFixedIndexedProperties(
+   CONNECTOR_SPECIFIC_OFFSETS,
+   
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+   offsetList.forEach(kv -> {
+   final int partition = 
descriptorProperties.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
 
 Review comment:
   this pice come from original code and I think we'd better to keep final


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355126336
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 ##
 @@ -227,6 +296,74 @@ public void testTableSink() {

assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}
 
+   @Test
+   public void testTableSinkWithLegacyProperties() {
+   // prepare parameters for Kafka table sink
+
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355126322
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
+   final Map offsetMap = new HashMap<>();
+
+   final String parseSpecificOffsetsStr = 
descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS);
+   if (null == parseSpecificOffsetsStr || 
parseSpecificOffsetsStr.length() == 0) {
+   throw new ValidationException("Properties 
connector.specific-offsets can not be empty, but is:" + 
parseSpecificOffsetsStr);
+   }
+
+   final String[] pairs = parseSpecificOffsetsStr.split(";");
+   for (String pair: pairs) {
+   if (null == pair || pair.length() == 0 || 
!pair.contains(",")) {
+   throw new ValidationException("Invalid 
properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" +
+   "should in the format 
'partition:0,offset:42;partition:1,offset:300', " +
+   "but is '" + 
parseSpecificOffsetsStr + "'.");   }
+
+   final String[] kv = pair.split(",");
+   if (kv.length != 2 ||
+   
!kv[0].startsWith(CONNECTOR_SPECIFIC_OFFSETS_PARTITION + ':') ||
+   
!kv[1].startsWith(CONNECTOR_SPECIFIC_OFFSETS_OFFSET + ':')) {
+   throw new ValidationException("Invalid 
properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" +
+   "should in the format 
'partition:0,offset:42;partition:1,offset:300', " +
+   "but is '" + 
parseSpecificOffsetsStr + "'.");
+   }
+
+   String partitionValue = 
kv[0].substring(kv[0].indexOf(":") + 1);
+   String offsetValue = kv[1].substring(kv[1].indexOf(":") 
+ 1);
+   try {
+   final Integer parttion = 
Integer.valueOf(partitionValue);
+   final Long offset = Long.valueOf(offsetValue);
+   offsetMap.put(parttion, offset);
+   }
+   catch (NumberFormatException e) {
 
 Review comment:
   `Long.valueOf()` and `Integer.valueOf` may throw a NumberFormatException


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355126281
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
+   final Map offsetMap = new HashMap<>();
+
+   final String parseSpecificOffsetsStr = 
descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS);
+   if (null == parseSpecificOffsetsStr || 
parseSpecificOffsetsStr.length() == 0) {
+   throw new ValidationException("Properties 
connector.specific-offsets can not be empty, but is:" + 
parseSpecificOffsetsStr);
+   }
+
+   final String[] pairs = parseSpecificOffsetsStr.split(";");
+   for (String pair: pairs) {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355126249
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
+   final Map offsetMap = new HashMap<>();
+
+   final String parseSpecificOffsetsStr = 
descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS);
+   if (null == parseSpecificOffsetsStr || 
parseSpecificOffsetsStr.length() == 0) {
 
 Review comment:
   Nice, `descriptorProperties.getString` has dealed null value


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355126175
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static Map 
validateAndGetSpecificOffsetsStr(DescriptorProperties descriptorProperties) {
+   final Map offsetMap = new HashMap<>();
+
+   final String parseSpecificOffsetsStr = 
descriptorProperties.getString(CONNECTOR_SPECIFIC_OFFSETS);
+   if (null == parseSpecificOffsetsStr || 
parseSpecificOffsetsStr.length() == 0) {
+   throw new ValidationException("Properties 
connector.specific-offsets can not be empty, but is:" + 
parseSpecificOffsetsStr);
+   }
+
+   final String[] pairs = parseSpecificOffsetsStr.split(";");
+   for (String pair: pairs) {
+   if (null == pair || pair.length() == 0 || 
!pair.contains(",")) {
+   throw new ValidationException("Invalid 
properties '" + CONNECTOR_SPECIFIC_OFFSETS + "'" +
 
 Review comment:
   ok
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355126132
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -134,4 +153,55 @@ public static String normalizeStartupMode(StartupMode 
startupMode) {
}
throw new IllegalArgumentException("Invalid startup mode.");
}
+
+   /**
+* Parse SpecificOffsets String to Map.
+*
+* SpecificOffsets String format was given as following:
+*
+* 
+* connector.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+* 
+* @param descriptorProperties
+* @return
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355126129
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##
 @@ -92,21 +96,36 @@ private void validateStartupMode(DescriptorProperties 
properties) {

startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS, 
noValidation());

startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_EARLIEST, 
noValidation());
startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_LATEST, 
noValidation());
-   startupModeValidation.put(
-   CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
-   key -> 
properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, 
specificOffsetValidators));
+
+   if (properties.containsKey(CONNECTOR_SPECIFIC_OFFSETS)) {
+   validateAndGetSpecificOffsetsStr(properties);
 
 Review comment:
   the same reason above hostsStr validation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355126077
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 ##
 @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit
}
 
if (specificOffsets != null) {
-   final List> values = new ArrayList<>();
+   final StringBuilder stringBuilder = new StringBuilder();
+   int i = 0;
for (Map.Entry specificOffset : 
specificOffsets.entrySet()) {
-   
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
+   if (i != 0) {
+   stringBuilder.append(';');
+   }
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(',');
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue());
+   i++;
}
-   properties.putIndexedFixedProperties(
-   CONNECTOR_SPECIFIC_OFFSETS,
-   
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET),
-   values);
+   properties.putString(CONNECTOR_SPECIFIC_OFFSETS, 
stringBuilder.toString());
}
 
if (kafkaProperties != null) {
-   properties.putIndexedFixedProperties(
-   CONNECTOR_PROPERTIES,
-   Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE),
-   this.kafkaProperties.entrySet().stream()
-   .map(e -> Arrays.asList(e.getKey(), 
e.getValue()))
-   .collect(Collectors.toList())
-   );
+   this.kafkaProperties.entrySet()
+   .forEach(
 
 Review comment:
   like this `this.kafkaProperties.entrySet().forEach(entry -> ` ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355125908
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 ##
 @@ -265,24 +259,24 @@ public Kafka sinkPartitionerCustom(Class partit
}
 
if (specificOffsets != null) {
-   final List> values = new ArrayList<>();
+   final StringBuilder stringBuilder = new StringBuilder();
+   int i = 0;
for (Map.Entry specificOffset : 
specificOffsets.entrySet()) {
-   
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
+   if (i != 0) {
+   stringBuilder.append(';');
+   }
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_PARTITION).append(':').append(specificOffset.getKey()).append(',');
+   
stringBuilder.append(CONNECTOR_SPECIFIC_OFFSETS_OFFSET).append(':').append(specificOffset.getValue());
 
 Review comment:
   you mean do not use stringBuilder here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355125769
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 ##
 @@ -274,13 +279,27 @@ private DescriptorProperties 
getValidatedProperties(Map properti
 
private Properties getKafkaProperties(DescriptorProperties 
descriptorProperties) {
final Properties kafkaProperties = new Properties();
-   final List> propsList = 
descriptorProperties.getFixedIndexedProperties(
-   CONNECTOR_PROPERTIES,
-   Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE));
-   propsList.forEach(kv -> kafkaProperties.put(
-   
descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
-   
descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
-   ));
+
+   if 
(descriptorProperties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT)
+   || 
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_BOOTSTRAP_SERVER)
+   || 
descriptorProperties.containsKey(CONNECTOR_PROPERTIES_GROUP_ID)) {
+   descriptorProperties.asMap().keySet()
+   .stream()
+   .filter(key -> 
key.startsWith(CONNECTOR_PROPERTIES))
+   .forEach(key -> {
+   final String value = 
descriptorProperties.getString(key);
+   final String subKey = 
key.replaceFirst(CONNECTOR_PROPERTIES + '.', "");
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355125709
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 ##
 @@ -274,13 +279,27 @@ private DescriptorProperties 
getValidatedProperties(Map properti
 
private Properties getKafkaProperties(DescriptorProperties 
descriptorProperties) {
final Properties kafkaProperties = new Properties();
-   final List> propsList = 
descriptorProperties.getFixedIndexedProperties(
-   CONNECTOR_PROPERTIES,
-   Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE));
-   propsList.forEach(kv -> kafkaProperties.put(
-   
descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
-   
descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
-   ));
+
+   if 
(descriptorProperties.containsKey(CONNECTOR_PROPERTIES_ZOOKEEPER_CONNECT)
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355125665
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -87,7 +93,12 @@ private void validateHosts(DescriptorProperties properties) 
{
hostsValidators.put(CONNECTOR_HOSTS_HOSTNAME, (key) -> 
properties.validateString(key, false, 1));
hostsValidators.put(CONNECTOR_HOSTS_PORT, (key) -> 
properties.validateInt(key, false, 0, 65535));
hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL, (key) -> 
properties.validateString(key, false, 1));
-   properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, 
false, hostsValidators);
+
+   if (properties.containsKey(CONNECTOR_HOSTS)) {
+   validateAndGetHostsStr(properties);
 
 Review comment:
   we  need to  parse  hostsStr before validation when use new properties like :
   `'connector.specific-offsets'='partition:0,offset:42;partition:1,offset:300'`
   and the parse logic only used in ElasticSearch, so I add this function here 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355125677
 
 

 ##
 File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
 ##
 @@ -126,4 +137,39 @@ private void 
validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, 
true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, 
true);
}
+
+   /**
+* Parse Hosts String to list.
+*
+* Hosts String format was given as following:
+*
+* 
+* connector.hosts = http://host_name:9092;http://host_name:9093
+* 
+* @param descriptorProperties
+* @return
+*/
+   public static List 
validateAndGetHostsStr(DescriptorProperties descriptorProperties) {
+   final List hostList = 
new ArrayList<>();
+
+   final String hostsStr = 
descriptorProperties.getString(CONNECTOR_HOSTS);
+   if (null == hostsStr || hostsStr.length() == 0) {
+   throw new ValidationException("Properties '" + 
CONNECTOR_HOSTS + "' can not be empty, but is:" + hostsStr);
+   }
+
+   final String[] hosts = hostsStr.split(";");
+   for (String host: hosts) {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leonardBang commented on a change in pull request #10468: [FLINK-14649][table sql / api] Flatten all the connector properties keys to make it easy to configure in DDL

2019-12-07 Thread GitBox
leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355125460
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -247,10 +247,8 @@ tables:
   topic: test-input
   startup-mode: earliest-offset
   properties:
-- key: zookeeper.connect
-  value: localhost:2181
-- key: bootstrap.servers
-  value: localhost:9092
+zookeeper.connect: localhost:2181,localhost:2182
 
 Review comment:
   specific multi hosts format here, no repeat,they have different port


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services