This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 16a554b Allow to configure most client/producer/consumer options in Kafka API wrapper (#1207) 16a554b is described below commit 16a554bfd5903482512d9a567777ccb21dfd5c01 Author: Matteo Merli <mme...@apache.org> AuthorDate: Fri Feb 9 21:32:49 2018 -0800 Allow to configure most client/producer/consumer options in Kafka API wrapper (#1207) --- .../clients/consumer/PulsarKafkaConsumer.java | 11 ++-- .../clients/producer/PulsarKafkaProducer.java | 8 +-- ...fkaConfig.java => PulsarClientKafkaConfig.java} | 44 ++++++++++++++- .../kafka/compat/PulsarConsumerKafkaConfig.java | 50 +++++++++++++++++ .../kafka/compat/PulsarProducerKafkaConfig.java | 64 ++++++++++++++++++++++ site/docs/latest/adaptors/KafkaWrapper.md | 42 +++++++++++--- 6 files changed, 203 insertions(+), 16 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java index d3dc6e4..97cde46 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java @@ -56,7 +56,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.kafka.compat.MessageIdUtils; -import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig; import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.common.naming.DestinationName; @@ -80,6 +81,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene private volatile boolean closed = false; + private final Properties properties; + private static class QueueItem { final org.apache.pulsar.client.api.Consumer consumer; final Message message; @@ -141,9 +144,9 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene String serviceUrl = config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0); - Properties properties = new Properties(); + this.properties = new Properties(); config.originals().forEach((k, v) -> properties.put(k, v)); - ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties); + ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties); // Since this client instance is going to be used just for the consumers, we can enable Nagle to group // all the acknowledgments sent to broker within a short time frame clientConf.setUseTcpNoDelay(false); @@ -201,7 +204,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene // acknowledgeCumulative() int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get(); - ConsumerConfiguration conf = new ConsumerConfiguration(); + ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties); conf.setSubscriptionType(SubscriptionType.Failover); conf.setMessageListener(this); if (numberOfPartitions > 1) { diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 793a641..7b8bf9a 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -48,7 +48,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.kafka.compat.MessageIdUtils; -import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig; public class PulsarKafkaProducer<K, V> implements Producer<K, V> { @@ -106,15 +107,14 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> { } String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0); - ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties); + ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties); try { client = PulsarClient.create(serviceUrl, clientConf); } catch (PulsarClientException e) { throw new RuntimeException(e); } - pulsarProducerConf = new ProducerConfiguration(); - pulsarProducerConf.setBatchingEnabled(true); + pulsarProducerConf = PulsarProducerKafkaConfig.getProducerConfiguration(properties); // To mimic the same batching mode as Kafka, we need to wait a very little amount of // time to batch if the client is trying to send messages fast enough diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java similarity index 53% rename from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java rename to pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java index 2396bac..d9ce75e 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java @@ -19,11 +19,12 @@ package org.apache.pulsar.client.kafka.compat; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.ClientConfiguration; -public class PulsarKafkaConfig { +public class PulsarClientKafkaConfig { /// Config variables public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class"; @@ -31,6 +32,17 @@ public class PulsarKafkaConfig { public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path"; public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection"; + public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms"; + public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds"; + public static final String NUM_IO_THREADS = "pulsar.num.io.threads"; + + public static final String CONNECTIONS_PER_BROKER = "pulsar.connections.per.broker"; + + public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay"; + + public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests"; + public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection"; + public static ClientConfiguration getClientConfiguration(Properties properties) { ClientConfiguration conf = new ClientConfiguration(); @@ -52,6 +64,36 @@ public class PulsarKafkaConfig { conf.setTlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH)); } + if (properties.containsKey(OPERATION_TIMEOUT_MS)) { + conf.setOperationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)), + TimeUnit.MILLISECONDS); + } + + if (properties.containsKey(STATS_INTERVAL_SECONDS)) { + conf.setStatsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)), TimeUnit.SECONDS); + } + + if (properties.containsKey(NUM_IO_THREADS)) { + conf.setIoThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS))); + } + + if (properties.containsKey(CONNECTIONS_PER_BROKER)) { + conf.setConnectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER))); + } + + if (properties.containsKey(USE_TCP_NODELAY)) { + conf.setUseTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY))); + } + + if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) { + conf.setConcurrentLookupRequest(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS))); + } + + if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) { + conf.setMaxNumberOfRejectedRequestPerConnection( + Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION))); + } + return conf; } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java new file mode 100644 index 0000000..f91c484 --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.kafka.compat; + +import java.util.Properties; + +import org.apache.pulsar.client.api.ConsumerConfiguration; + +public class PulsarConsumerKafkaConfig { + + /// Config variables + public static final String CONSUMER_NAME = "pulsar.consumer.name"; + public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size"; + public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions"; + + public static ConsumerConfiguration getConsumerConfiguration(Properties properties) { + ConsumerConfiguration conf = new ConsumerConfiguration(); + + if (properties.containsKey(CONSUMER_NAME)) { + conf.setConsumerName(properties.getProperty(CONSUMER_NAME)); + } + + if (properties.containsKey(RECEIVER_QUEUE_SIZE)) { + conf.setReceiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE))); + } + + if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) { + conf.setMaxTotalReceiverQueueSizeAcrossPartitions( + Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS))); + } + + return conf; + } +} diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java new file mode 100644 index 0000000..c2e4886 --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.kafka.compat; + +import java.util.Properties; + +import org.apache.pulsar.client.api.ProducerConfiguration; + +public class PulsarProducerKafkaConfig { + + /// Config variables + public static final String PRODUCER_NAME = "pulsar.producer.name"; + public static final String INITIAL_SEQUENCE_ID = "pulsar.producer.initial.sequence.id"; + + public static final String MAX_PENDING_MESSAGES = "pulsar.producer.max.pending.messages"; + public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions"; + public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled"; + public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages"; + + public static ProducerConfiguration getProducerConfiguration(Properties properties) { + ProducerConfiguration conf = new ProducerConfiguration(); + + if (properties.containsKey(PRODUCER_NAME)) { + conf.setProducerName(properties.getProperty(PRODUCER_NAME)); + } + + if (properties.containsKey(INITIAL_SEQUENCE_ID)) { + conf.setInitialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID))); + } + + if (properties.containsKey(MAX_PENDING_MESSAGES)) { + conf.setMaxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES))); + } + + if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) { + conf.setMaxPendingMessagesAcrossPartitions( + Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS))); + } + + conf.setBatchingEnabled(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true"))); + + if (properties.containsKey(BATCHING_MAX_MESSAGES)) { + conf.setBatchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES))); + } + + return conf; + } +} diff --git a/site/docs/latest/adaptors/KafkaWrapper.md b/site/docs/latest/adaptors/KafkaWrapper.md index 53bf2e5..444cb0f 100644 --- a/site/docs/latest/adaptors/KafkaWrapper.md +++ b/site/docs/latest/adaptors/KafkaWrapper.md @@ -183,9 +183,9 @@ APIs: | `void commitAsync()` | Yes | | | `void commitAsync(OffsetCommitCallback callback)` | Yes | | | `void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)` | Yes | | -| `void seek(TopicPartition partition, long offset)` | Yes | | -| `void seekToBeginning(Collection<TopicPartition> partitions)` | Yes | | -| `void seekToEnd(Collection<TopicPartition> partitions)` | Yes | | +| `void seek(TopicPartition partition, long offset)` | Yes | | +| `void seekToBeginning(Collection<TopicPartition> partitions)` | Yes | | +| `void seekToEnd(Collection<TopicPartition> partitions)` | Yes | | | `long position(TopicPartition partition)` | Yes | | | `OffsetAndMetadata committed(TopicPartition partition)` | Yes | | | `Map<MetricName, ? extends Metric> metrics()` | No | | @@ -229,11 +229,39 @@ Properties: You can configure Pulsar authentication provider directly from the Kafka properties. -Properties: +### Pulsar client properties: | Config property | Default | Notes | |:---------------------------------------|:--------|:---------------------------------------------------------------------------------------| | `pulsar.authentication.class` | | Configure to auth provider. Eg. `org.apache.pulsar.client.impl.auth.AuthenticationTls` | -| `pulsar.use.tls` | `false` | Enable TLS transport encryption | -| `pulsar.tls.trust.certs.file.path` | | Path for the TLS trust certificate store | -| `pulsar.tls.allow.insecure.connection` | `false` | Accept self-signed certificates from brokers | +| [`pulsar.use.tls`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTls-boolean-) | `false` | Enable TLS transport encryption | +| [`pulsar.tls.trust.certs.file.path`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsTrustCertsFilePath-java.lang.String-) | | Path for the TLS trust certificate store | +| [`pulsar.tls.allow.insecure.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsAllowInsecureConnection-boolean-) | `false` | Accept self-signed certificates from brokers | +| [`pulsar.operation.timeout.ms`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setOperationTimeout-int-java.util.concurrent.TimeUnit-) | `30000` | General operations timeout | +| [`pulsar.stats.interval.seconds`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setStatsInterval-long-java.util.concurrent.TimeUnit-) | `60` | Pulsar client lib stats printing interval | +| [`pulsar.num.io.threads`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setIoThreads-int-) | `1` | Number of Netty IO threads to use | +| [`pulsar.connections.per.broker`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setConnectionsPerBroker-int-) | `1` | Max number of connection to open to each broker | +| [`pulsar.use.tcp.nodelay`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTcpNoDelay-boolean-) | `true` | TCP no-delay | +| [`pulsar.concurrent.lookup.requests`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setConcurrentLookupRequest-int-) | `50000` | Max number of concurrent topic lookups | +| [`pulsar.max.number.rejected.request.per.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setMaxNumberOfRejectedRequestPerConnection-int-) | `50` | Threshold of errors to forcefully close a connection | + + +### Pulsar producer properties + +| Config property | Default | Notes | +|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------| +| [`pulsar.producer.name`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setProducerName-java.lang.String-) | | Specify producer name | +| [`pulsar.producer.initial.sequence.id`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setInitialSequenceId-long-) | | Specify baseline for sequence id for this producer | +| [`pulsar.producer.max.pending.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessages-int-) | `1000` | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. | +| [`pulsar.producer.max.pending.messages.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessagesAcrossPartitions-int-) | `50000` | Set the number of max pending messages across all the partitions | +| [`pulsar.producer.batching.enabled`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingEnabled-boolean-) | `true` | Control whether automatic batching of messages is enabled for the producer | +| [`pulsar.producer.batching.max.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingMaxMessages-int-) | `1000` | The maximum number of messages permitted in a batch | + + +### Pulsar consumer Properties + +| Config property | Default | Notes | +|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------| +| [`pulsar.consumer.name`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setConsumerName-java.lang.String-) | | Set the consumer name | +| [`pulsar.consumer.receiver.queue.size`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setReceiverQueueSize-int-) | 1000 | Sets the size of the consumer receive queue | +| [`pulsar.consumer.total.receiver.queue.size.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-) | 50000 | Set the max total receiver queue size across partitons | -- To stop receiving notification emails like this one, please contact mme...@apache.org.