syhily commented on a change in pull request #16900:
URL: https://github.com/apache/flink/pull/16900#discussion_r694060240



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
##########
@@ -234,25 +239,87 @@ public static void checkConfigurations(Configuration 
configuration) {
             PulsarClient client, Schema<T> schema, 
ConsumerConfigurationData<T> config)
             throws PulsarClientException {
         // ConsumerBuilder don't support using the given 
ConsumerConfigurationData directly.
-        ConsumerBuilder<T> consumerBuilder = 
client.newConsumer(schema).loadConf(configMap(config));
+        ConsumerBuilder<T> builder = new 
ConsumerBuilderImpl<>((PulsarClientImpl) client, schema);
 
-        // Set some non-serializable fields.
-        if (config.getMessageListener() != null) {
-            consumerBuilder.messageListener(config.getMessageListener());
-        }
-        if (config.getConsumerEventListener() != null) {
-            
consumerBuilder.consumerEventListener(config.getConsumerEventListener());
-        }
-        if (config.getCryptoKeyReader() != null) {
-            consumerBuilder.cryptoKeyReader(config.getCryptoKeyReader());
-        }
-        if (config.getMessageCrypto() != null) {
-            consumerBuilder.messageCrypto(config.getMessageCrypto());
-        }
-        if (config.getBatchReceivePolicy() != null) {
-            consumerBuilder.batchReceivePolicy(config.getBatchReceivePolicy());
+        // Since pulsar don't expose the config constructor.
+        // We have to set the builder fields one by one.
+        setConfig(config.getTopicNames(), topics -> builder.topics(new 
ArrayList<>(topics)));
+        setConfig(config.getTopicsPattern(), builder::topicsPattern);
+        setConfig(config.getSubscriptionName(), builder::subscriptionName);
+        setConfig(
+                config.getAckTimeoutMillis(),
+                millis -> builder.ackTimeout(millis, TimeUnit.MILLISECONDS));
+        setConfig(config.isAckReceiptEnabled(), builder::isAckReceiptEnabled);
+        setConfig(
+                config.getTickDurationMillis(),
+                millis -> builder.ackTimeoutTickTime(millis, 
TimeUnit.MILLISECONDS));
+        setConfig(
+                config.getNegativeAckRedeliveryDelayMicros(),
+                micros -> builder.negativeAckRedeliveryDelay(micros, 
TimeUnit.MICROSECONDS));
+        setConfig(config.getSubscriptionType(), builder::subscriptionType);
+        setConfig(config.getSubscriptionMode(), builder::subscriptionMode);
+        setConfig(config.getMessageListener(), builder::messageListener);
+        setConfig(config.getConsumerEventListener(), 
builder::consumerEventListener);
+        setConfig(config.getCryptoKeyReader(), builder::cryptoKeyReader);
+        setConfig(config.getMessageCrypto(), builder::messageCrypto);
+        setConfig(config.getCryptoFailureAction(), 
builder::cryptoFailureAction);
+        setConfig(config.getReceiverQueueSize(), builder::receiverQueueSize);
+        setConfig(
+                config.getAcknowledgementsGroupTimeMicros(),
+                micros -> builder.acknowledgmentGroupTime(micros, 
TimeUnit.MICROSECONDS));
+        setConfig(config.getConsumerName(), builder::consumerName);
+        setConfig(config.getPriorityLevel(), builder::priorityLevel);
+        setConfig(config.getMaxPendingChunkedMessage(), 
builder::maxPendingChunkedMessage);
+        setConfig(
+                config.isAutoAckOldestChunkedMessageOnQueueFull(),
+                builder::autoAckOldestChunkedMessageOnQueueFull);
+        setConfig(config.getProperties(), builder::properties);
+        setConfig(
+                config.getMaxTotalReceiverQueueSizeAcrossPartitions(),
+                builder::maxTotalReceiverQueueSizeAcrossPartitions);
+        setConfig(config.isReadCompacted(), builder::readCompacted);
+        setConfig(config.getPatternAutoDiscoveryPeriod(), 
builder::patternAutoDiscoveryPeriod);
+        setConfig(config.getPatternAutoDiscoveryPeriod(), 
builder::patternAutoDiscoveryPeriod);
+        setConfig(config.getSubscriptionInitialPosition(), 
builder::subscriptionInitialPosition);
+        setConfig(config.getRegexSubscriptionMode(), 
builder::subscriptionTopicsMode);
+        setConfig(config.isReplicateSubscriptionState(), 
builder::replicateSubscriptionState);
+        setConfig(config.getDeadLetterPolicy(), builder::deadLetterPolicy);
+        setConfig(config.isAutoUpdatePartitions(), 
builder::autoUpdatePartitions);
+        setConfig(
+                config.getAutoUpdatePartitionsIntervalSeconds(),
+                seconds ->
+                        builder.autoUpdatePartitionsInterval(
+                                Math.toIntExact(seconds), TimeUnit.SECONDS));
+        if (config.isResetIncludeHead()) {
+            builder.startMessageIdInclusive();
         }
+        setConfig(config.getBatchReceivePolicy(), builder::batchReceivePolicy);
+        setConfig(config.getKeySharedPolicy(), builder::keySharedPolicy);
+        setConfig(config.isRetryEnable(), builder::enableRetry);
+        setConfig(config.isBatchIndexAckEnabled(), 
builder::enableBatchIndexAcknowledgment);
+        setConfig(
+                config.getExpireTimeOfIncompleteChunkedMessageMillis(),
+                millis ->
+                        builder.expireTimeOfIncompleteChunkedMessage(
+                                millis, TimeUnit.MILLISECONDS));
+        setConfig(config.isPoolMessages(), builder::poolMessages);
 
-        return consumerBuilder.subscribe();
+        return builder.subscribe();
+    }
+
+    private static <T> void setConfig(T value, java.util.function.Consumer<T> 
consumer) {
+        if (value != null) {
+            if (value instanceof Collection) {
+                if (!((Collection<?>) value).isEmpty()) {
+                    consumer.accept(value);
+                }
+            } else if (value instanceof Map) {
+                if (!((Map<?, ?>) value).isEmpty()) {
+                    consumer.accept(value);
+                }
+            } else {
+                consumer.accept(value);
+            }

Review comment:
       > We still have the opportunity to change API until the final release. 
Would it help to get rid of `ClientConfigurationData`? It feels like there is 
quite a bit friction involved. And since you replicated all options to 
`ConfigOption`s, it would even be better to go `Configuration` all the way for 
Flink users.
   
   Drop the `ClientConfigurationData` and `ConsumerConfigurationData` would be 
cool. I think we could just use the  `Configuration` all the way. Do we need a 
separate PR for this feature.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to