AHeise commented on a change in pull request #16900:
URL: https://github.com/apache/flink/pull/16900#discussion_r694016919
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java
##########
@@ -217,26 +217,7 @@ public static PulsarClient createClient(
* except the authentication.
*/
public static PulsarClient createClient(ClientConfigurationData config) {
- // PulsarClientBuilder don't support using the given
ClientConfigurationData directly.
- Map<String, Object> configMap = configMap(config);
-
- // These two auth config would be serialized to a secret information
by default.
- configMap.put("authParamMap", config.getAuthParamMap());
- configMap.put("authParams", config.getAuthParams());
-
- ClientBuilder clientBuilder =
PulsarClient.builder().loadConf(configMap);
-
- // Set some non-serializable fields.
- if (config.getAuthentication() != null) {
- clientBuilder.authentication(config.getAuthentication());
- }
- if (config.getClock() != null) {
- clientBuilder.clock(config.getClock());
- }
- if (config.getServiceUrlProvider() != null) {
- clientBuilder.serviceUrlProvider(config.getServiceUrlProvider());
- }
-
+ ClientBuilder clientBuilder = new ClientBuilderImpl(config);
Review comment:
I'm not a big fan of using internal API. Should we try to get some of
your changes into Pulsar upstream?
I could imagine that such a Test client would be beneficial also for other
Pulsar users.
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
##########
@@ -234,25 +239,87 @@ public static void checkConfigurations(Configuration
configuration) {
PulsarClient client, Schema<T> schema,
ConsumerConfigurationData<T> config)
throws PulsarClientException {
// ConsumerBuilder don't support using the given
ConsumerConfigurationData directly.
- ConsumerBuilder<T> consumerBuilder =
client.newConsumer(schema).loadConf(configMap(config));
+ ConsumerBuilder<T> builder = new
ConsumerBuilderImpl<>((PulsarClientImpl) client, schema);
- // Set some non-serializable fields.
- if (config.getMessageListener() != null) {
- consumerBuilder.messageListener(config.getMessageListener());
- }
- if (config.getConsumerEventListener() != null) {
-
consumerBuilder.consumerEventListener(config.getConsumerEventListener());
- }
- if (config.getCryptoKeyReader() != null) {
- consumerBuilder.cryptoKeyReader(config.getCryptoKeyReader());
- }
- if (config.getMessageCrypto() != null) {
- consumerBuilder.messageCrypto(config.getMessageCrypto());
- }
- if (config.getBatchReceivePolicy() != null) {
- consumerBuilder.batchReceivePolicy(config.getBatchReceivePolicy());
+ // Since pulsar don't expose the config constructor.
+ // We have to set the builder fields one by one.
+ setConfig(config.getTopicNames(), topics -> builder.topics(new
ArrayList<>(topics)));
+ setConfig(config.getTopicsPattern(), builder::topicsPattern);
+ setConfig(config.getSubscriptionName(), builder::subscriptionName);
+ setConfig(
+ config.getAckTimeoutMillis(),
+ millis -> builder.ackTimeout(millis, TimeUnit.MILLISECONDS));
+ setConfig(config.isAckReceiptEnabled(), builder::isAckReceiptEnabled);
+ setConfig(
+ config.getTickDurationMillis(),
+ millis -> builder.ackTimeoutTickTime(millis,
TimeUnit.MILLISECONDS));
+ setConfig(
+ config.getNegativeAckRedeliveryDelayMicros(),
+ micros -> builder.negativeAckRedeliveryDelay(micros,
TimeUnit.MICROSECONDS));
+ setConfig(config.getSubscriptionType(), builder::subscriptionType);
+ setConfig(config.getSubscriptionMode(), builder::subscriptionMode);
+ setConfig(config.getMessageListener(), builder::messageListener);
+ setConfig(config.getConsumerEventListener(),
builder::consumerEventListener);
+ setConfig(config.getCryptoKeyReader(), builder::cryptoKeyReader);
+ setConfig(config.getMessageCrypto(), builder::messageCrypto);
+ setConfig(config.getCryptoFailureAction(),
builder::cryptoFailureAction);
+ setConfig(config.getReceiverQueueSize(), builder::receiverQueueSize);
+ setConfig(
+ config.getAcknowledgementsGroupTimeMicros(),
+ micros -> builder.acknowledgmentGroupTime(micros,
TimeUnit.MICROSECONDS));
+ setConfig(config.getConsumerName(), builder::consumerName);
+ setConfig(config.getPriorityLevel(), builder::priorityLevel);
+ setConfig(config.getMaxPendingChunkedMessage(),
builder::maxPendingChunkedMessage);
+ setConfig(
+ config.isAutoAckOldestChunkedMessageOnQueueFull(),
+ builder::autoAckOldestChunkedMessageOnQueueFull);
+ setConfig(config.getProperties(), builder::properties);
+ setConfig(
+ config.getMaxTotalReceiverQueueSizeAcrossPartitions(),
+ builder::maxTotalReceiverQueueSizeAcrossPartitions);
+ setConfig(config.isReadCompacted(), builder::readCompacted);
+ setConfig(config.getPatternAutoDiscoveryPeriod(),
builder::patternAutoDiscoveryPeriod);
+ setConfig(config.getPatternAutoDiscoveryPeriod(),
builder::patternAutoDiscoveryPeriod);
+ setConfig(config.getSubscriptionInitialPosition(),
builder::subscriptionInitialPosition);
+ setConfig(config.getRegexSubscriptionMode(),
builder::subscriptionTopicsMode);
+ setConfig(config.isReplicateSubscriptionState(),
builder::replicateSubscriptionState);
+ setConfig(config.getDeadLetterPolicy(), builder::deadLetterPolicy);
+ setConfig(config.isAutoUpdatePartitions(),
builder::autoUpdatePartitions);
+ setConfig(
+ config.getAutoUpdatePartitionsIntervalSeconds(),
+ seconds ->
+ builder.autoUpdatePartitionsInterval(
+ Math.toIntExact(seconds), TimeUnit.SECONDS));
+ if (config.isResetIncludeHead()) {
+ builder.startMessageIdInclusive();
}
+ setConfig(config.getBatchReceivePolicy(), builder::batchReceivePolicy);
+ setConfig(config.getKeySharedPolicy(), builder::keySharedPolicy);
+ setConfig(config.isRetryEnable(), builder::enableRetry);
+ setConfig(config.isBatchIndexAckEnabled(),
builder::enableBatchIndexAcknowledgment);
+ setConfig(
+ config.getExpireTimeOfIncompleteChunkedMessageMillis(),
+ millis ->
+ builder.expireTimeOfIncompleteChunkedMessage(
+ millis, TimeUnit.MILLISECONDS));
+ setConfig(config.isPoolMessages(), builder::poolMessages);
- return consumerBuilder.subscribe();
+ return builder.subscribe();
+ }
+
+ private static <T> void setConfig(T value, java.util.function.Consumer<T>
consumer) {
+ if (value != null) {
+ if (value instanceof Collection) {
+ if (!((Collection<?>) value).isEmpty()) {
+ consumer.accept(value);
+ }
+ } else if (value instanceof Map) {
+ if (!((Map<?, ?>) value).isEmpty()) {
+ consumer.accept(value);
+ }
+ } else {
+ consumer.accept(value);
+ }
Review comment:
I don't understand why this change is needed at all.
Why isn't `ConsumerBuilder<T> consumerBuilder =
client.newConsumer(schema).loadConf(configMap(config));` working anymore?
Again would it be easier to remove `ConsumerConfigurationData` and just use
`Configuration`?
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java
##########
@@ -217,26 +217,7 @@ public static PulsarClient createClient(
* except the authentication.
*/
public static PulsarClient createClient(ClientConfigurationData config) {
- // PulsarClientBuilder don't support using the given
ClientConfigurationData directly.
- Map<String, Object> configMap = configMap(config);
-
- // These two auth config would be serialized to a secret information
by default.
- configMap.put("authParamMap", config.getAuthParamMap());
- configMap.put("authParams", config.getAuthParams());
-
- ClientBuilder clientBuilder =
PulsarClient.builder().loadConf(configMap);
-
- // Set some non-serializable fields.
- if (config.getAuthentication() != null) {
- clientBuilder.authentication(config.getAuthentication());
- }
- if (config.getClock() != null) {
- clientBuilder.clock(config.getClock());
- }
- if (config.getServiceUrlProvider() != null) {
- clientBuilder.serviceUrlProvider(config.getServiceUrlProvider());
- }
-
+ ClientBuilder clientBuilder = new ClientBuilderImpl(config);
Review comment:
Is this just a way to get rid of the `configMap` method? If so, wouldn't
it make sense to rather remove `ClientConfigurationData` from user-facing and
just go with `Configuration` all the way?
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils;
+
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
+import org.apache.flink.connectors.test.common.TestResource;
+import
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.MultipleFailureException;
+import org.junit.runners.model.Statement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A JUnit 5 {@link Extension} for supporting running a pulsar instance before
executing tests. This
+ * class is also a {@link ExternalSystem} for {@code flink-connector-testing}
tools.
+ *
+ * <p>Some old flink tests are based on JUint 4, this class is also support
it. The follow code
+ * snippet shows how to use this class in JUnit 4.
Review comment:
Did you see anywhere the need for JUnit4? We actually want to forbid
writing new tests with JUnit4 soonish, so I don't think this hybrid support is
not needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]