This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push: new feaa450 [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26) feaa450 is described below commit feaa4504d567a5310e0a3d54003e721696491993 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Jul 20 13:05:37 2021 -0700 [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26) * [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer * add properties param for getEncryptionKey method --- .../clients/producer/PulsarKafkaProducer.java | 2 +- .../kafka/compat/CryptoKeyReaderFactory.java | 51 ++++++++++ .../kafka/compat/PulsarConsumerKafkaConfig.java | 12 +++ .../kafka/compat/PulsarProducerKafkaConfig.java | 17 ++++ .../producer/PulsarCliebtKafkaConfigTest.java | 111 +++++++++++++++++++++ .../clients/producer/PulsarKafkaProducerTest.java | 1 - 6 files changed, 192 insertions(+), 2 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index f6da38e..fca5da8 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory; public class PulsarKafkaProducer<K, V> implements Producer<K, V> { private final PulsarClient client; - private final ProducerBuilder<byte[]> pulsarProducerBuilder; + final ProducerBuilder<byte[]> pulsarProducerBuilder; private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>(); diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java new file mode 100644 index 0000000..649c297 --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.kafka.compat; + +import java.util.Properties; +import java.util.Set; + +import org.apache.pulsar.client.api.CryptoKeyReader; + +/** + * Factory class to create {@link CryptoKeyReader} by using configuration stored in Producer/Consumer properties. + * + */ +public interface CryptoKeyReaderFactory { + + /** + * Create CryptoKeyReader object for Producer/Consumer. + * + * @param properties + * properties provided by user to create CryptoKeyReader based on configuration params + * @return CryptoKeyReader + */ + CryptoKeyReader create(Properties properties); + + /** + * Encryption keys for {@link CryptoKeyReader} while enabling encryption at producer. + * + * @param properties + * properties provided by user to get encryption-keys based on configuration params + * @return Set of encryption keys + */ + default Set<String> getEncryptionKey(Properties properties) { + return null; + } +} diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java index 09a9806..40858b1 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit; import static org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS; +import static org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.CRYPTO_READER_FACTORY_CLASS_NAME; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RegexSubscriptionMode; @@ -70,6 +71,17 @@ public class PulsarConsumerKafkaConfig { if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) { consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS))); } + + if (properties.containsKey(CRYPTO_READER_FACTORY_CLASS_NAME)) { + try { + CryptoKeyReaderFactory cryptoReaderFactory = (CryptoKeyReaderFactory) Class + .forName(properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME)).newInstance(); + consumerBuilder.cryptoKeyReader(cryptoReaderFactory.create(properties)); + } catch (Exception e) { + throw new IllegalArgumentException("Failed to create crypto reader using factory " + + properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME), e); + } + } return consumerBuilder; } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java index 3315cd2..276d777 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.kafka.compat; import java.util.Properties; +import java.util.Set; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -35,6 +36,7 @@ public class PulsarProducerKafkaConfig { public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled"; public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages"; public static final String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions"; + public static final String CRYPTO_READER_FACTORY_CLASS_NAME = "pulsar.crypto.reader.factory.class.name"; /** * send operations will immediately fail with {@link ProducerQueueIsFullError} when there is no space left in * pending queue. @@ -70,6 +72,21 @@ public class PulsarProducerKafkaConfig { if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) { producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS))); } + + if (properties.containsKey(CRYPTO_READER_FACTORY_CLASS_NAME)) { + try { + CryptoKeyReaderFactory cryptoReaderFactory = (CryptoKeyReaderFactory) Class + .forName(properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME)).newInstance(); + producerBuilder.cryptoKeyReader(cryptoReaderFactory.create(properties)); + Set<String> keys = cryptoReaderFactory.getEncryptionKey(properties); + if (keys != null) { + keys.forEach(producerBuilder::addEncryptionKey); + } + } catch (Exception e) { + throw new IllegalArgumentException("Failed to create crypto reader using factory " + + properties.getProperty(CRYPTO_READER_FACTORY_CLASS_NAME), e); + } + } return producerBuilder; } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarCliebtKafkaConfigTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarCliebtKafkaConfigTest.java new file mode 100644 index 0000000..5ddddf7 --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarCliebtKafkaConfigTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.clients.producer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; +import org.apache.pulsar.client.impl.ProducerBuilderImpl; +import org.apache.pulsar.client.kafka.compat.CryptoKeyReaderFactory; +import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; + +public class PulsarCliebtKafkaConfigTest { + + + @Test + public void testPulsarKafkaProducer() { + ClientBuilder mockClientBuilder = mock(ClientBuilder.class); + ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class); + doAnswer(invocation -> { + Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000."); + return mockProducerBuilder; + }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class)); + doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString()); + doAnswer(invocation -> { + Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000."); + return mockClientBuilder; + }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class)); + + // validate producer + Properties properties = new Properties(); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650")); + properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000"); + properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000"); + properties.put(PulsarProducerKafkaConfig.CRYPTO_READER_FACTORY_CLASS_NAME, CryptoKeyReaderFactoryImpl.class.getName()); + PulsarKafkaProducer producer = new PulsarKafkaProducer<>(properties); + ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) producer.pulsarProducerBuilder; + assertEquals(producerBuilder.getConf().getCryptoKeyReader(), CryptoKeyReaderFactoryImpl.reader); + assertEquals(producerBuilder.getConf().getEncryptionKeys(), CryptoKeyReaderFactoryImpl.encryptionKeys); + + // validate consumer + properties = new Properties(); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650")); + properties.put(PulsarProducerKafkaConfig.CRYPTO_READER_FACTORY_CLASS_NAME, CryptoKeyReaderFactoryImpl.class.getName()); + PulsarClient client = mock(PulsarClient.class); + ConsumerBuilderImpl<byte[]> consumerBuilder = new ConsumerBuilderImpl<>(null, null); + doReturn(consumerBuilder).when(client).newConsumer(); + PulsarConsumerKafkaConfig.getConsumerBuilder(client , properties); + assertEquals(consumerBuilder.getConf().getCryptoKeyReader(), CryptoKeyReaderFactoryImpl.reader); + } + + public static class CryptoKeyReaderFactoryImpl implements CryptoKeyReaderFactory { + static final CryptoKeyReader reader = DefaultCryptoKeyReader.builder().build(); + static final Set<String> encryptionKeys = Sets.newHashSet("test"); + + @Override + public CryptoKeyReader create(Properties properties) { + return reader; + } + + @Override + public Set<String> getEncryptionKey(Properties properties) { + return encryptionKeys; + } + } +} diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java index b1069ad..bf6a6cf 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java @@ -270,5 +270,4 @@ public class PulsarKafkaProducerTest { } } - }