Demogorgon314 commented on code in PR #18195:
URL: https://github.com/apache/pulsar/pull/18195#discussion_r1012597347
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -51,13 +52,15 @@
private final List<BiConsumer<String, T>> listeners;
private final ReentrantLock listenersMutex;
+ private TopicCompactionStrategy<T> compactionStrategy;
TableViewImpl(PulsarClientImpl client, Schema<T> schema,
TableViewConfigurationData conf) {
this.conf = conf;
this.data = new ConcurrentHashMap<>();
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
+ this.compactionStrategy =
TopicCompactionStrategy.load(conf.getTopicCompactionStrategy());
Review Comment:
Do we have a unit test to cover this new feature?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java:
##########
@@ -164,8 +164,8 @@ public Message<T> readNext() throws PulsarClientException {
@Override
public Message<T> readNext(int timeout, TimeUnit unit) throws
PulsarClientException {
- Message<T> msg = consumer.receive(timeout, unit);
+ Message<T> msg = consumer.receive(timeout, unit);
Review Comment:
Please remove the useless change.
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each
key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+ private static final Logger log =
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+ private static final int MAX_OUTSTANDING = 500;
+ private final Duration phaseOneLoopReadTimeout;
+ private final RawBatchMessageContainerImpl batchMessageContainer;
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler,
+ int maxNumMessagesInBatch) {
+ super(conf, pulsar, bk, scheduler);
+ batchMessageContainer = new
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+ phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ }
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+ this(conf, pulsar, bk, scheduler, -1);
+ }
+
+ public CompletableFuture<Long> compact(String topic) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public <T> CompletableFuture<Long> compact(String topic,
+ TopicCompactionStrategy<T>
strategy) {
+ return compact(topic, strategy, null);
+ }
+
+ public <T> CompletableFuture<Long> compact(String topic,
+ TopicCompactionStrategy<T>
strategy,
+ CryptoKeyReader
cryptoKeyReader) {
+ CompletableFuture<Consumer<T>> consumerFuture = new
CompletableFuture<>();
+ if (cryptoKeyReader != null) {
+ batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+ }
+ CompactionReaderImpl reader = CompactionReaderImpl.create(
+ (PulsarClientImpl) pulsar, strategy.getSchema(), topic,
consumerFuture, cryptoKeyReader);
+
+ return consumerFuture.thenComposeAsync(__ ->
compactAndCloseReader(reader, strategy), scheduler);
+ }
+
+ <T> CompletableFuture<Long> doCompaction(Reader<T> reader,
TopicCompactionStrategy strategy) {
+
+ if (!(reader instanceof CompactionReaderImpl<T>)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("reader has to be
DelayedAckReaderImpl"));
+ }
+ return reader.hasMessageAvailableAsync()
+ .thenCompose(available -> {
+ if (available) {
+ return phaseOne(reader, strategy)
+ .thenCompose((result) -> phaseTwo(result,
reader, bk));
+ } else {
+ log.info("Skip compaction of the empty topic {}",
reader.getTopic());
+ return CompletableFuture.completedFuture(-1L);
+ }
+ });
+ }
+
+ <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader,
TopicCompactionStrategy strategy) {
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+ mxBean.addCompactionStartOp(reader.getTopic());
+ doCompaction(reader, strategy).whenComplete(
+ (ledgerId, exception) -> {
+ log.info("Completed doCompaction ledgerId:{}", ledgerId);
+ reader.closeAsync().whenComplete((v, exception2) -> {
+ if (exception2 != null) {
+ log.warn("Error closing reader handle {},
ignoring", reader, exception2);
+ }
+ if (exception != null) {
+ // complete with original exception
+ mxBean.addCompactionEndOp(reader.getTopic(),
false);
+ promise.completeExceptionally(exception);
+ } else {
+ mxBean.addCompactionEndOp(reader.getTopic(), true);
+ promise.complete(ledgerId);
+ }
+ });
+ });
+ return promise;
+ }
+
+ private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T>
result, TopicCompactionStrategy strategy) {
+ Map<String, Message<T>> cache = result.cache;
+ String key = msg.getKey();
+
+ if (key == null) {
+ msg.release();
+ return true;
+ }
+ T val = msg.getValue();
+ Message<T> prev = cache.get(key);
+ T prevVal = prev == null ? null : prev.getValue();
+
+ if (strategy.isValid(prevVal, val)) {
+ if (val != null && msg.size() > 0) {
+ cache.remove(key); // to reorder
+ cache.put(key, msg);
+ } else {
+ cache.remove(key);
+ msg.release();
+ }
+
+ if (prev != null) {
+ prev.release();
+ }
+
+ result.validCompactionCount.incrementAndGet();
+ return true;
+ } else {
+ msg.release();
+ result.invalidCompactionCount.incrementAndGet();
+ return false;
+ }
+
+ }
+
+ private static class PhaseOneResult<T> {
+ MessageId firstId;
+ //MessageId to; // last undeleted messageId
+ MessageId lastId; // last read messageId
+ Map<String, Message<T>> cache;
+
+ AtomicInteger invalidCompactionCount;
+
+ AtomicInteger validCompactionCount;
+
+ AtomicInteger numReadMessages;
+
+ String topic;
+
+ PhaseOneResult(String topic) {
+ this.topic = topic;
+ cache = new LinkedHashMap<>();
+ invalidCompactionCount = new AtomicInteger();
+ validCompactionCount = new AtomicInteger();
+ numReadMessages = new AtomicInteger();
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{Topic:%s, firstId:%s, lastId:%s, cache.size:%d, "
+ + "invalidCompactionCount:%d,
validCompactionCount:%d, numReadMessages:%d}",
+ topic,
+ firstId != null ? firstId.toString() : "",
+ lastId != null ? lastId.toString() : "",
+ cache.size(),
+ invalidCompactionCount.get(),
+ validCompactionCount.get(),
+ numReadMessages.get());
+ }
+ }
+
+
+ private <T> CompletableFuture<PhaseOneResult> phaseOne(Reader<T> reader,
TopicCompactionStrategy strategy) {
+ CompletableFuture<PhaseOneResult> promise = new CompletableFuture<>();
+ PhaseOneResult<T> result = new PhaseOneResult(reader.getTopic());
+
+ ((CompactionReaderImpl<T>) reader).getLastMessageIdAsync()
+ .thenAccept(lastMessageId -> {
+ log.info("Commencing phase one of compaction for {},
reading to {}",
+ reader.getTopic(), lastMessageId);
+ result.lastId = copyMessageId(lastMessageId);
+ phaseOneLoop(reader, promise, result, strategy);
+ }).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+
+ return promise;
+
+ }
+
+ private static MessageId copyMessageId(MessageId msgId) {
+ if (msgId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl tempId = (BatchMessageIdImpl) msgId;
+ return new BatchMessageIdImpl(tempId);
+ } else if (msgId instanceof MessageIdImpl) {
+ MessageIdImpl tempId = (MessageIdImpl) msgId;
+ return new MessageIdImpl(tempId.getLedgerId(), tempId.getEntryId(),
+ tempId.getPartitionIndex());
+ } else {
+ throw new IllegalStateException("Unknown lastMessageId type");
+ }
+ }
+
+ private <T> void phaseOneLoop(Reader<T> reader,
CompletableFuture<PhaseOneResult> promise,
+ PhaseOneResult<T> result,
TopicCompactionStrategy strategy) {
Review Comment:
```suggestion
private <T> void phaseOneLoop(Reader<T> reader,
CompletableFuture<PhaseOneResult> promise,
PhaseOneResult<T> result,
TopicCompactionStrategy<T> strategy) {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each
key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+ private static final Logger log =
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+ private static final int MAX_OUTSTANDING = 500;
+ private final Duration phaseOneLoopReadTimeout;
+ private final RawBatchMessageContainerImpl batchMessageContainer;
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler,
+ int maxNumMessagesInBatch) {
+ super(conf, pulsar, bk, scheduler);
+ batchMessageContainer = new
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+ phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ }
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+ this(conf, pulsar, bk, scheduler, -1);
+ }
+
+ public CompletableFuture<Long> compact(String topic) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public <T> CompletableFuture<Long> compact(String topic,
+ TopicCompactionStrategy<T>
strategy) {
+ return compact(topic, strategy, null);
+ }
+
+ public <T> CompletableFuture<Long> compact(String topic,
+ TopicCompactionStrategy<T>
strategy,
+ CryptoKeyReader
cryptoKeyReader) {
+ CompletableFuture<Consumer<T>> consumerFuture = new
CompletableFuture<>();
+ if (cryptoKeyReader != null) {
+ batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+ }
+ CompactionReaderImpl reader = CompactionReaderImpl.create(
+ (PulsarClientImpl) pulsar, strategy.getSchema(), topic,
consumerFuture, cryptoKeyReader);
+
+ return consumerFuture.thenComposeAsync(__ ->
compactAndCloseReader(reader, strategy), scheduler);
+ }
+
+ <T> CompletableFuture<Long> doCompaction(Reader<T> reader,
TopicCompactionStrategy strategy) {
+
+ if (!(reader instanceof CompactionReaderImpl<T>)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("reader has to be
DelayedAckReaderImpl"));
+ }
+ return reader.hasMessageAvailableAsync()
+ .thenCompose(available -> {
+ if (available) {
+ return phaseOne(reader, strategy)
+ .thenCompose((result) -> phaseTwo(result,
reader, bk));
+ } else {
+ log.info("Skip compaction of the empty topic {}",
reader.getTopic());
+ return CompletableFuture.completedFuture(-1L);
+ }
+ });
+ }
+
+ <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader,
TopicCompactionStrategy strategy) {
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+ mxBean.addCompactionStartOp(reader.getTopic());
+ doCompaction(reader, strategy).whenComplete(
+ (ledgerId, exception) -> {
+ log.info("Completed doCompaction ledgerId:{}", ledgerId);
+ reader.closeAsync().whenComplete((v, exception2) -> {
+ if (exception2 != null) {
+ log.warn("Error closing reader handle {},
ignoring", reader, exception2);
+ }
+ if (exception != null) {
+ // complete with original exception
+ mxBean.addCompactionEndOp(reader.getTopic(),
false);
+ promise.completeExceptionally(exception);
+ } else {
+ mxBean.addCompactionEndOp(reader.getTopic(), true);
+ promise.complete(ledgerId);
+ }
+ });
+ });
+ return promise;
+ }
+
+ private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T>
result, TopicCompactionStrategy strategy) {
Review Comment:
```suggestion
private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T>
result, TopicCompactionStrategy<T> strategy) {
```
##########
pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * Defines a strategy to compact messages in a topic.
+ * This strategy can be passed to Topic Compactor and Table View to select
messages in a specific way.
+ *
+ * Examples:
+ *
+ * TopicCompactionStrategy strategy = new MyTopicCompactionStrategy();
+ *
+ * // Run topic compaction by the compaction strategy.
+ * // While compacting messages for each key,
+ * // it will choose messages only if TopicCompactionStrategy.valid(prev,
cur) returns true.
+ * StrategicTwoPhaseCompactor compactor = new StrategicTwoPhaseCompactor(...);
+ * compactor.compact(topic, strategy);
+ *
+ * // Run table view by the compaction strategy.
+ * // While updating messages in the table view <key,value> map,
+ * // it will choose messages only if TopicCompactionStrategy.valid(prev,
cur) returns true.
+ * TableView tableView =
pulsar.getClient().newTableViewBuilder(strategy.getSchema())
+ * .topic(topic)
+ * .loadConf(Map.of(
+ * "topicCompactionStrategy",
strategy.getClass().getCanonicalName()))
+ * .create();
+ */
+public interface TopicCompactionStrategy<T> {
+
+ /**
+ * Returns the schema object for this strategy.
+ * @return
+ */
+ Schema<T> getSchema();
+ /**
+ * Tests if the current message is valid compared to the previous message
for the same key.
+ *
+ * @param prev previous message
+ * @param cur current message
+ * @return True if the prev to the cur message transition is valid.
Otherwise, false.
+ */
+ boolean isValid(T prev, T cur);
+
+
+ static TopicCompactionStrategy load(String topicCompactionStrategy) {
Review Comment:
```suggestion
static <T> TopicCompactionStrategy<T> load(String
topicCompactionStrategy) {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ * An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ * The compaction consumer subscription is durable and consumes compacted
messages from the earliest position.
+ * It does not acknowledge the message after each read. (needs to call
acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+ ConsumerBase<T> consumer;
+
+ ReaderConfigurationData<T> readerConfiguration;
+ private CompactionReaderImpl(PulsarClientImpl client,
ReaderConfigurationData<T> readerConfiguration,
+ ExecutorProvider executorProvider,
CompletableFuture<Consumer<T>> consumerFuture,
+ Schema<T> schema) {
+ super(client, readerConfiguration, executorProvider, consumerFuture,
schema);
+ this.readerConfiguration = readerConfiguration;
+ this.consumer = getConsumer();
+ }
+
+ public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client,
Schema<T> schema, String topic,
+
CompletableFuture<Consumer<T>> consumerFuture,
+ CryptoKeyReader
cryptoKeyReader) {
+ ReaderConfigurationData conf = new ReaderConfigurationData<>();
Review Comment:
Add generics.
```suggestion
ReaderConfigurationData<T> conf = new ReaderConfigurationData<>();
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ * An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ * The compaction consumer subscription is durable and consumes compacted
messages from the earliest position.
+ * It does not acknowledge the message after each read. (needs to call
acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+ ConsumerBase<T> consumer;
+
+ ReaderConfigurationData<T> readerConfiguration;
+ private CompactionReaderImpl(PulsarClientImpl client,
ReaderConfigurationData<T> readerConfiguration,
+ ExecutorProvider executorProvider,
CompletableFuture<Consumer<T>> consumerFuture,
+ Schema<T> schema) {
+ super(client, readerConfiguration, executorProvider, consumerFuture,
schema);
+ this.readerConfiguration = readerConfiguration;
+ this.consumer = getConsumer();
+ }
+
+ public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client,
Schema<T> schema, String topic,
+
CompletableFuture<Consumer<T>> consumerFuture,
+ CryptoKeyReader
cryptoKeyReader) {
+ ReaderConfigurationData conf = new ReaderConfigurationData<>();
+ conf.setTopicName(topic);
+ conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+ conf.setStartMessageId(MessageId.earliest);
+ conf.setStartMessageFromRollbackDurationInSec(0);
+ conf.setReadCompacted(true);
+ conf.setSubscriptionMode(SubscriptionMode.Durable);
+
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return new CompactionReaderImpl(client, conf,
client.externalExecutorProvider(), consumerFuture, schema);
+ }
+
+
+ @Override
+ public Message<T> readNext() throws PulsarClientException {
+ return consumer.receive();
+ }
+
+ @Override
+ public Message<T> readNext(int timeout, TimeUnit unit) throws
PulsarClientException {
+ return consumer.receive(timeout, unit);
+ }
+
+ @Override
+ public CompletableFuture<Message<T>> readNextAsync() {
+ CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
+ return receiveFuture;
Review Comment:
```suggestion
return consumer.receiveAsync();
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ * An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ * The compaction consumer subscription is durable and consumes compacted
messages from the earliest position.
+ * It does not acknowledge the message after each read. (needs to call
acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+ ConsumerBase<T> consumer;
+
+ ReaderConfigurationData<T> readerConfiguration;
+ private CompactionReaderImpl(PulsarClientImpl client,
ReaderConfigurationData<T> readerConfiguration,
+ ExecutorProvider executorProvider,
CompletableFuture<Consumer<T>> consumerFuture,
+ Schema<T> schema) {
+ super(client, readerConfiguration, executorProvider, consumerFuture,
schema);
+ this.readerConfiguration = readerConfiguration;
+ this.consumer = getConsumer();
+ }
+
+ public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client,
Schema<T> schema, String topic,
+
CompletableFuture<Consumer<T>> consumerFuture,
+ CryptoKeyReader
cryptoKeyReader) {
+ ReaderConfigurationData conf = new ReaderConfigurationData<>();
+ conf.setTopicName(topic);
+ conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+ conf.setStartMessageId(MessageId.earliest);
+ conf.setStartMessageFromRollbackDurationInSec(0);
+ conf.setReadCompacted(true);
+ conf.setSubscriptionMode(SubscriptionMode.Durable);
+
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return new CompactionReaderImpl(client, conf,
client.externalExecutorProvider(), consumerFuture, schema);
+ }
+
+
+ @Override
+ public Message<T> readNext() throws PulsarClientException {
+ return consumer.receive();
+ }
+
+ @Override
+ public Message<T> readNext(int timeout, TimeUnit unit) throws
PulsarClientException {
+ return consumer.receive(timeout, unit);
+ }
+
+ @Override
+ public CompletableFuture<Message<T>> readNextAsync() {
+ CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
+ return receiveFuture;
+ }
+
+ public CompletableFuture<MessageId> getLastMessageIdAsync() {
+ return consumer.getLastMessageIdAsync();
+ }
+
+ public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId
messageId, Map<String, Long> properties) {
+ return consumer.doAcknowledgeWithTxn(messageId,
CommandAck.AckType.Cumulative, properties, null);
Review Comment:
Why do we use `doAcknowledgeWithTxn`? Can we use `doAcknowledge` instead?
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class CompactionReaderImplTest extends MockedPulsarServiceBaseTest {
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+ admin.clusters().createCluster("test",
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("my-property",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns",
Sets.newHashSet("test"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ String topic = "persistent://my-property/my-ns/my-compact-topic";
+ int numKeys = 5;
+ @Cleanup
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ for (int i = 0; i < numKeys; i++) {
+ producer.newMessage().key("key:" + i).value("value" + i).send();
+ }
+
+ @Cleanup
+ CompactionReaderImpl<String> reader = CompactionReaderImpl
+ .create((PulsarClientImpl) pulsarClient, Schema.STRING, topic,
new CompletableFuture(), null);
+
+ ConsumerBase consumerBase = spy(reader.getConsumer());
+ org.apache.commons.lang3.reflect.FieldUtils.writeDeclaredField(
Review Comment:
nit: Remove the full package name.
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each
key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+ private static final Logger log =
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+ private static final int MAX_OUTSTANDING = 500;
+ private final Duration phaseOneLoopReadTimeout;
+ private final RawBatchMessageContainerImpl batchMessageContainer;
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler,
+ int maxNumMessagesInBatch) {
+ super(conf, pulsar, bk, scheduler);
+ batchMessageContainer = new
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+ phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ }
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+ this(conf, pulsar, bk, scheduler, -1);
+ }
+
+ public CompletableFuture<Long> compact(String topic) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public <T> CompletableFuture<Long> compact(String topic,
+ TopicCompactionStrategy<T>
strategy) {
+ return compact(topic, strategy, null);
+ }
+
+ public <T> CompletableFuture<Long> compact(String topic,
+ TopicCompactionStrategy<T>
strategy,
+ CryptoKeyReader
cryptoKeyReader) {
+ CompletableFuture<Consumer<T>> consumerFuture = new
CompletableFuture<>();
+ if (cryptoKeyReader != null) {
+ batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+ }
+ CompactionReaderImpl reader = CompactionReaderImpl.create(
+ (PulsarClientImpl) pulsar, strategy.getSchema(), topic,
consumerFuture, cryptoKeyReader);
+
+ return consumerFuture.thenComposeAsync(__ ->
compactAndCloseReader(reader, strategy), scheduler);
+ }
+
+ <T> CompletableFuture<Long> doCompaction(Reader<T> reader,
TopicCompactionStrategy strategy) {
+
+ if (!(reader instanceof CompactionReaderImpl<T>)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("reader has to be
DelayedAckReaderImpl"));
+ }
+ return reader.hasMessageAvailableAsync()
+ .thenCompose(available -> {
+ if (available) {
+ return phaseOne(reader, strategy)
+ .thenCompose((result) -> phaseTwo(result,
reader, bk));
+ } else {
+ log.info("Skip compaction of the empty topic {}",
reader.getTopic());
+ return CompletableFuture.completedFuture(-1L);
+ }
+ });
+ }
+
+ <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader,
TopicCompactionStrategy strategy) {
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+ mxBean.addCompactionStartOp(reader.getTopic());
+ doCompaction(reader, strategy).whenComplete(
+ (ledgerId, exception) -> {
+ log.info("Completed doCompaction ledgerId:{}", ledgerId);
+ reader.closeAsync().whenComplete((v, exception2) -> {
+ if (exception2 != null) {
+ log.warn("Error closing reader handle {},
ignoring", reader, exception2);
+ }
+ if (exception != null) {
+ // complete with original exception
+ mxBean.addCompactionEndOp(reader.getTopic(),
false);
+ promise.completeExceptionally(exception);
+ } else {
+ mxBean.addCompactionEndOp(reader.getTopic(), true);
+ promise.complete(ledgerId);
+ }
+ });
+ });
+ return promise;
+ }
+
+ private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T>
result, TopicCompactionStrategy strategy) {
+ Map<String, Message<T>> cache = result.cache;
+ String key = msg.getKey();
+
+ if (key == null) {
+ msg.release();
+ return true;
+ }
+ T val = msg.getValue();
+ Message<T> prev = cache.get(key);
+ T prevVal = prev == null ? null : prev.getValue();
+
+ if (strategy.isValid(prevVal, val)) {
+ if (val != null && msg.size() > 0) {
+ cache.remove(key); // to reorder
+ cache.put(key, msg);
+ } else {
+ cache.remove(key);
+ msg.release();
+ }
+
+ if (prev != null) {
+ prev.release();
+ }
+
+ result.validCompactionCount.incrementAndGet();
+ return true;
+ } else {
+ msg.release();
+ result.invalidCompactionCount.incrementAndGet();
+ return false;
+ }
+
+ }
+
+ private static class PhaseOneResult<T> {
+ MessageId firstId;
+ //MessageId to; // last undeleted messageId
Review Comment:
This field is not used?
##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ * An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ * The compaction consumer subscription is durable and consumes compacted
messages from the earliest position.
+ * It does not acknowledge the message after each read. (needs to call
acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+ ConsumerBase<T> consumer;
+
+ ReaderConfigurationData<T> readerConfiguration;
+ private CompactionReaderImpl(PulsarClientImpl client,
ReaderConfigurationData<T> readerConfiguration,
+ ExecutorProvider executorProvider,
CompletableFuture<Consumer<T>> consumerFuture,
+ Schema<T> schema) {
+ super(client, readerConfiguration, executorProvider, consumerFuture,
schema);
+ this.readerConfiguration = readerConfiguration;
+ this.consumer = getConsumer();
+ }
+
+ public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client,
Schema<T> schema, String topic,
+
CompletableFuture<Consumer<T>> consumerFuture,
+ CryptoKeyReader
cryptoKeyReader) {
+ ReaderConfigurationData conf = new ReaderConfigurationData<>();
+ conf.setTopicName(topic);
+ conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+ conf.setStartMessageId(MessageId.earliest);
+ conf.setStartMessageFromRollbackDurationInSec(0);
+ conf.setReadCompacted(true);
+ conf.setSubscriptionMode(SubscriptionMode.Durable);
+
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return new CompactionReaderImpl(client, conf,
client.externalExecutorProvider(), consumerFuture, schema);
Review Comment:
```suggestion
return new CompactionReaderImpl<>(client, conf,
client.externalExecutorProvider(), consumerFuture, schema);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]