wuchong commented on a change in pull request #12150:
URL: https://github.com/apache/flink/pull/12150#discussion_r426210338



##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+       private KafkaOptions() {}
+
+       // 
--------------------------------------------------------------------------------------------
+       // Kafka specific options
+       // 
--------------------------------------------------------------------------------------------
+
+       public static final ConfigOption<String> TOPIC = ConfigOptions
+                       .key("topic")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Required topic name from which the 
table is read");
+
+       public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = 
ConfigOptions
+                       .key("properties.bootstrap.servers")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Required Kafka server connection 
string");
+
+       public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+                       .key("properties.group.id")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Required consumer group in Kafka 
consumer, no need for Kafka producer");
+
+       public static final ConfigOption<String> PROPS_ZK_CONNECT = 
ConfigOptions
+                       .key("properties.zookeeper.connect")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Optional ZooKeeper connection 
string");
+
+       // 
--------------------------------------------------------------------------------------------
+       // Scan specific options
+       // 
--------------------------------------------------------------------------------------------
+
+       public static final ConfigOption<String> SCAN_STARTUP_MODE = 
ConfigOptions
+                       .key("scan.startup-mode")
+                       .stringType()
+                       .defaultValue("group-offsets")
+                       .withDescription("Optional startup mode for Kafka 
consumer, valid enumerations are "
+                                       + "\"earliest-offset\", 
\"latest-offset\", \"group-offsets\"\n"
+                                       + "or \"specific-offsets\"");
+
+       public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS 
= ConfigOptions
+                       .key("scan.startup.specific-offsets")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Optional offsets used in case of 
\"specific-offsets\" startup mode");
+
+       public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = 
ConfigOptions
+                       .key("scan.startup.timestamp-millis")
+                       .longType()
+                       .noDefaultValue();
+
+       // 
--------------------------------------------------------------------------------------------
+       // Sink specific options
+       // 
--------------------------------------------------------------------------------------------
+
+       public static final ConfigOption<String> SINK_PARTITIONER = 
ConfigOptions
+                       .key("sink.partitioner")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Optional output partitioning from 
Flink's partitions\n"
+                                       + "into Kafka's partitions valid 
enumerations are\n"
+                                       + "\"fixed\": (each Flink partition 
ends up in at most one Kafka partition),\n"
+                                       + "\"round-robin\": (a Flink partition 
is distributed to Kafka partitions round-robin)\n"
+                                       + "\"custom class name\": (use a custom 
FlinkKafkaPartitioner subclass)");
+
+       // 
--------------------------------------------------------------------------------------------
+       // Option enumerations
+       // 
--------------------------------------------------------------------------------------------
+
+       // Start up offset.
+       public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
+       public static final String SCAN_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+       public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = 
"group-offsets";
+       public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = 
"specific-offsets";
+       public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = 
"timestamp";
+
+       private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new 
HashSet<>(Arrays.asList(
+                       SCAN_STARTUP_MODE_VALUE_EARLIEST,
+                       SCAN_STARTUP_MODE_VALUE_LATEST,
+                       SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS,
+                       SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
+                       SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+
+       // Sink partitioner.
+       public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
+       public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = 
"round-robin";
+
+       private static final Set<String> SINK_PARTITIONER_ENUMS = new 
HashSet<>(Arrays.asList(
+                       SINK_PARTITIONER_VALUE_FIXED,
+                       SINK_PARTITIONER_VALUE_ROUND_ROBIN));
+
+       // Prefix for Kafka specific properties.
+       public static final String PROPERTIES = "properties";
+
+       // Other keywords.
+       private static final String PARTITION = "partition";
+       private static final String OFFSET = "offset";
+
+       // 
--------------------------------------------------------------------------------------------
+       // Validation
+       // 
--------------------------------------------------------------------------------------------
+
+       public static void validateTableOptions(ReadableConfig tableOptions) {
+               validateScanStartupMode(tableOptions);
+               validateSinkPartitioner(tableOptions);
+       }
+
+       private static void validateScanStartupMode(ReadableConfig 
tableOptions) {
+               tableOptions.getOptional(SCAN_STARTUP_MODE)
+                               .map(String::toLowerCase)
+                               .ifPresent(mode -> {
+                                       if 
(!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
+                                               throw new ValidationException(
+                                                               
String.format("Invalid value for option '%s'. Supported values are %s, but was: 
%s",
+                                                                               
SCAN_STARTUP_MODE.key(),
+                                                                               
"[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]",
+                                                                               
mode));
+                                       }
+
+                                       if 
(mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP)) {
+                                               if 
(!tableOptions.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
+                                                       throw new 
ValidationException(String.format("'%s' is required in '%s' startup mode"
+                                                                               
        + " but missing.",
+                                                                       
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+                                                                       
SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+                                               }
+                                       }
+                                       if 
(mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
+                                               if 
(!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
+                                                       throw new 
ValidationException(String.format("'%s' is required in '%s' startup mode"
+                                                                               
        + " but missing.",
+                                                                       
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
+                                                                       
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
+                                               }
+                                               String specificOffsets = 
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+                                               
parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+                       }
+               });
+       }
+
+       private static void validateSinkPartitioner(ReadableConfig 
tableOptions) {
+               tableOptions.getOptional(SINK_PARTITIONER)
+                               .ifPresent(partitioner -> {
+                                       if 
(!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
+                                               if (partitioner.isEmpty()) {
+                                                       throw new 
ValidationException(
+                                                                       
String.format("Option '%s' should be a non-empty string.",
+                                                                               
        SINK_PARTITIONER.key()));
+                                               }
+                                       }
+                               });
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       public static StartupOptions getStartupOptions(
+                       ReadableConfig tableOptions,
+                       String topic) {
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+               final StartupMode startupMode = 
tableOptions.getOptional(SCAN_STARTUP_MODE)
+                               .map(modeString -> {
+                                       switch (modeString) {
+                                       case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+                                               return StartupMode.EARLIEST;
+
+                                       case SCAN_STARTUP_MODE_VALUE_LATEST:
+                                               return StartupMode.LATEST;
+
+                                       case 
SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+                                               return 
StartupMode.GROUP_OFFSETS;
+
+                                       case 
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+                                               
buildSpecificOffsets(tableOptions, topic, specificOffsets);
+                                               return 
StartupMode.SPECIFIC_OFFSETS;
+
+                                       case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+                                               return StartupMode.TIMESTAMP;
+
+                                       default:
+                                               throw new 
TableException("Unsupported startup mode. Validator should have checked that.");
+                                       }
+                               }).orElse(StartupMode.GROUP_OFFSETS);
+               final StartupOptions options = new StartupOptions();
+               options.startupMode = startupMode;
+               options.specificOffsets = specificOffsets;
+               if (startupMode == StartupMode.TIMESTAMP) {
+                       options.startupTimestampMillis = 
tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+               }
+               return options;
+       }
+
+       private static void buildSpecificOffsets(
+                       ReadableConfig tableOptions,
+                       String topic,
+                       Map<KafkaTopicPartition, Long> specificOffsets) {
+               String specificOffsetsStrOpt = 
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+               final Map<Integer, Long> offsetMap = parseSpecificOffsets(
+                               specificOffsetsStrOpt,
+                               SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+               offsetMap.forEach((partition, offset) -> {
+                       final KafkaTopicPartition topicPartition = new 
KafkaTopicPartition(topic, partition);
+                       specificOffsets.put(topicPartition, offset);
+               });
+       }
+
+       public static Properties getKafkaProperties(Map<String, String> 
tableOptions) {
+               final Properties kafkaProperties = new Properties();
+
+               if (hasKafkaClientProperties(tableOptions)) {
+                       tableOptions.keySet().stream()
+                                       .filter(key -> 
key.startsWith(PROPERTIES + '.'))
+                                       .forEach(key -> {
+                                               final String value = 
tableOptions.get(key);
+                                               final String subKey = 
key.substring((PROPERTIES + '.').length());
+                                               kafkaProperties.put(subKey, 
value);
+                                       });
+               }
+               return kafkaProperties;
+       }
+
+       /**
+        * The partitioner can be either "fixed", "round-robin" or a customized 
partitioner full class name.
+        */
+       @SuppressWarnings("unchecked")
+       public static Optional<FlinkKafkaPartitioner<RowData>> 
getFlinkKafkaPartitioner(ReadableConfig tableOptions) {
+               return tableOptions.getOptional(SINK_PARTITIONER)
+                               .flatMap((String partitioner) -> {
+                                       switch (partitioner) {
+                                       case SINK_PARTITIONER_VALUE_FIXED:
+                                               return Optional.of(new 
FlinkFixedPartitioner<>());
+                                       case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+                                               return Optional.empty();
+                                       // Default fallback to full class name 
of the partitioner.
+                                       default:
+                                               final Class<? extends 
FlinkKafkaPartitioner> partitionerClass =
+                                                               
getPartitionerClass(partitioner);
+                                               return 
Optional.of((FlinkKafkaPartitioner<RowData>) 
InstantiationUtil.instantiate(partitionerClass));

Review comment:
       Missed to use classloader.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to