Demogorgon314 commented on code in PR #18195:
URL: https://github.com/apache/pulsar/pull/18195#discussion_r1012597347


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -51,13 +52,15 @@
 
     private final List<BiConsumer<String, T>> listeners;
     private final ReentrantLock listenersMutex;
+    private TopicCompactionStrategy<T> compactionStrategy;
 
     TableViewImpl(PulsarClientImpl client, Schema<T> schema, 
TableViewConfigurationData conf) {
         this.conf = conf;
         this.data = new ConcurrentHashMap<>();
         this.immutableData = Collections.unmodifiableMap(data);
         this.listeners = new ArrayList<>();
         this.listenersMutex = new ReentrantLock();
+        this.compactionStrategy = 
TopicCompactionStrategy.load(conf.getTopicCompactionStrategy());

Review Comment:
   Do we have a unit test to cover this new feature?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java:
##########
@@ -164,8 +164,8 @@ public Message<T> readNext() throws PulsarClientException {
 
     @Override
     public Message<T> readNext(int timeout, TimeUnit unit) throws 
PulsarClientException {
-        Message<T> msg = consumer.receive(timeout, unit);
 
+        Message<T> msg = consumer.receive(timeout, unit);

Review Comment:
   Please remove the useless change.



##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each 
key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+    private static final Logger log = 
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+    private static final int MAX_OUTSTANDING = 500;
+    private final Duration phaseOneLoopReadTimeout;
+    private final RawBatchMessageContainerImpl batchMessageContainer;
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler,
+                                      int maxNumMessagesInBatch) {
+        super(conf, pulsar, bk, scheduler);
+        batchMessageContainer = new 
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+        phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+    }
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler) {
+        this(conf, pulsar, bk, scheduler, -1);
+    }
+
+    public CompletableFuture<Long> compact(String topic) {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> 
strategy) {
+        return compact(topic, strategy, null);
+    }
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> 
strategy,
+                                               CryptoKeyReader 
cryptoKeyReader) {
+        CompletableFuture<Consumer<T>> consumerFuture = new 
CompletableFuture<>();
+        if (cryptoKeyReader != null) {
+            batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+        }
+        CompactionReaderImpl reader = CompactionReaderImpl.create(
+                (PulsarClientImpl) pulsar, strategy.getSchema(), topic, 
consumerFuture, cryptoKeyReader);
+
+        return consumerFuture.thenComposeAsync(__ -> 
compactAndCloseReader(reader, strategy), scheduler);
+    }
+
+    <T> CompletableFuture<Long> doCompaction(Reader<T> reader, 
TopicCompactionStrategy strategy) {
+
+        if (!(reader instanceof CompactionReaderImpl<T>)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("reader has to be 
DelayedAckReaderImpl"));
+        }
+        return reader.hasMessageAvailableAsync()
+                .thenCompose(available -> {
+                    if (available) {
+                        return phaseOne(reader, strategy)
+                                .thenCompose((result) -> phaseTwo(result, 
reader, bk));
+                    } else {
+                        log.info("Skip compaction of the empty topic {}", 
reader.getTopic());
+                        return CompletableFuture.completedFuture(-1L);
+                    }
+                });
+    }
+
+    <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, 
TopicCompactionStrategy strategy) {
+        CompletableFuture<Long> promise = new CompletableFuture<>();
+        mxBean.addCompactionStartOp(reader.getTopic());
+        doCompaction(reader, strategy).whenComplete(
+                (ledgerId, exception) -> {
+                    log.info("Completed doCompaction ledgerId:{}", ledgerId);
+                    reader.closeAsync().whenComplete((v, exception2) -> {
+                        if (exception2 != null) {
+                            log.warn("Error closing reader handle {}, 
ignoring", reader, exception2);
+                        }
+                        if (exception != null) {
+                            // complete with original exception
+                            mxBean.addCompactionEndOp(reader.getTopic(), 
false);
+                            promise.completeExceptionally(exception);
+                        } else {
+                            mxBean.addCompactionEndOp(reader.getTopic(), true);
+                            promise.complete(ledgerId);
+                        }
+                    });
+                });
+        return promise;
+    }
+
+    private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T> 
result, TopicCompactionStrategy strategy) {
+        Map<String, Message<T>> cache = result.cache;
+        String key = msg.getKey();
+
+        if (key == null) {
+            msg.release();
+            return true;
+        }
+        T val = msg.getValue();
+        Message<T> prev = cache.get(key);
+        T prevVal = prev == null ? null : prev.getValue();
+
+        if (strategy.isValid(prevVal, val)) {
+            if (val != null && msg.size() > 0) {
+                cache.remove(key); // to reorder
+                cache.put(key, msg);
+            } else {
+                cache.remove(key);
+                msg.release();
+            }
+
+            if (prev != null) {
+                prev.release();
+            }
+
+            result.validCompactionCount.incrementAndGet();
+            return true;
+        } else {
+            msg.release();
+            result.invalidCompactionCount.incrementAndGet();
+            return false;
+        }
+
+    }
+
+    private static class PhaseOneResult<T> {
+        MessageId firstId;
+        //MessageId to; // last undeleted messageId
+        MessageId lastId; // last read messageId
+        Map<String, Message<T>> cache;
+
+        AtomicInteger invalidCompactionCount;
+
+        AtomicInteger validCompactionCount;
+
+        AtomicInteger numReadMessages;
+
+        String topic;
+
+        PhaseOneResult(String topic) {
+            this.topic = topic;
+            cache = new LinkedHashMap<>();
+            invalidCompactionCount = new AtomicInteger();
+            validCompactionCount = new AtomicInteger();
+            numReadMessages = new AtomicInteger();
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "{Topic:%s, firstId:%s, lastId:%s, cache.size:%d, "
+                            + "invalidCompactionCount:%d, 
validCompactionCount:%d, numReadMessages:%d}",
+                    topic,
+                    firstId != null ? firstId.toString() : "",
+                    lastId != null ? lastId.toString() : "",
+                    cache.size(),
+                    invalidCompactionCount.get(),
+                    validCompactionCount.get(),
+                    numReadMessages.get());
+        }
+    }
+
+
+    private <T> CompletableFuture<PhaseOneResult> phaseOne(Reader<T> reader, 
TopicCompactionStrategy strategy) {
+        CompletableFuture<PhaseOneResult> promise = new CompletableFuture<>();
+        PhaseOneResult<T> result = new PhaseOneResult(reader.getTopic());
+
+        ((CompactionReaderImpl<T>) reader).getLastMessageIdAsync()
+                .thenAccept(lastMessageId -> {
+                    log.info("Commencing phase one of compaction for {}, 
reading to {}",
+                            reader.getTopic(), lastMessageId);
+                    result.lastId = copyMessageId(lastMessageId);
+                    phaseOneLoop(reader, promise, result, strategy);
+                }).exceptionally(ex -> {
+                    promise.completeExceptionally(ex);
+                    return null;
+                });
+
+        return promise;
+
+    }
+
+    private static MessageId copyMessageId(MessageId msgId) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl tempId = (BatchMessageIdImpl) msgId;
+            return new BatchMessageIdImpl(tempId);
+        } else if (msgId instanceof MessageIdImpl) {
+            MessageIdImpl tempId = (MessageIdImpl) msgId;
+            return new MessageIdImpl(tempId.getLedgerId(), tempId.getEntryId(),
+                    tempId.getPartitionIndex());
+        } else {
+            throw new IllegalStateException("Unknown lastMessageId type");
+        }
+    }
+
+    private <T> void phaseOneLoop(Reader<T> reader, 
CompletableFuture<PhaseOneResult> promise,
+                                  PhaseOneResult<T> result, 
TopicCompactionStrategy strategy) {

Review Comment:
   ```suggestion
       private <T> void phaseOneLoop(Reader<T> reader, 
CompletableFuture<PhaseOneResult> promise,
                                     PhaseOneResult<T> result, 
TopicCompactionStrategy<T> strategy) {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each 
key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+    private static final Logger log = 
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+    private static final int MAX_OUTSTANDING = 500;
+    private final Duration phaseOneLoopReadTimeout;
+    private final RawBatchMessageContainerImpl batchMessageContainer;
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler,
+                                      int maxNumMessagesInBatch) {
+        super(conf, pulsar, bk, scheduler);
+        batchMessageContainer = new 
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+        phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+    }
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler) {
+        this(conf, pulsar, bk, scheduler, -1);
+    }
+
+    public CompletableFuture<Long> compact(String topic) {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> 
strategy) {
+        return compact(topic, strategy, null);
+    }
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> 
strategy,
+                                               CryptoKeyReader 
cryptoKeyReader) {
+        CompletableFuture<Consumer<T>> consumerFuture = new 
CompletableFuture<>();
+        if (cryptoKeyReader != null) {
+            batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+        }
+        CompactionReaderImpl reader = CompactionReaderImpl.create(
+                (PulsarClientImpl) pulsar, strategy.getSchema(), topic, 
consumerFuture, cryptoKeyReader);
+
+        return consumerFuture.thenComposeAsync(__ -> 
compactAndCloseReader(reader, strategy), scheduler);
+    }
+
+    <T> CompletableFuture<Long> doCompaction(Reader<T> reader, 
TopicCompactionStrategy strategy) {
+
+        if (!(reader instanceof CompactionReaderImpl<T>)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("reader has to be 
DelayedAckReaderImpl"));
+        }
+        return reader.hasMessageAvailableAsync()
+                .thenCompose(available -> {
+                    if (available) {
+                        return phaseOne(reader, strategy)
+                                .thenCompose((result) -> phaseTwo(result, 
reader, bk));
+                    } else {
+                        log.info("Skip compaction of the empty topic {}", 
reader.getTopic());
+                        return CompletableFuture.completedFuture(-1L);
+                    }
+                });
+    }
+
+    <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, 
TopicCompactionStrategy strategy) {
+        CompletableFuture<Long> promise = new CompletableFuture<>();
+        mxBean.addCompactionStartOp(reader.getTopic());
+        doCompaction(reader, strategy).whenComplete(
+                (ledgerId, exception) -> {
+                    log.info("Completed doCompaction ledgerId:{}", ledgerId);
+                    reader.closeAsync().whenComplete((v, exception2) -> {
+                        if (exception2 != null) {
+                            log.warn("Error closing reader handle {}, 
ignoring", reader, exception2);
+                        }
+                        if (exception != null) {
+                            // complete with original exception
+                            mxBean.addCompactionEndOp(reader.getTopic(), 
false);
+                            promise.completeExceptionally(exception);
+                        } else {
+                            mxBean.addCompactionEndOp(reader.getTopic(), true);
+                            promise.complete(ledgerId);
+                        }
+                    });
+                });
+        return promise;
+    }
+
+    private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T> 
result, TopicCompactionStrategy strategy) {

Review Comment:
   ```suggestion
       private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T> 
result, TopicCompactionStrategy<T> strategy) {
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * Defines a strategy to compact messages in a topic.
+ * This strategy can be passed to Topic Compactor and Table View to select 
messages in a specific way.
+ *
+ * Examples:
+ *
+ * TopicCompactionStrategy strategy = new MyTopicCompactionStrategy();
+ *
+ * // Run topic compaction by the compaction strategy.
+ * // While compacting messages for each key,
+ * //   it will choose messages only if TopicCompactionStrategy.valid(prev, 
cur) returns true.
+ * StrategicTwoPhaseCompactor compactor = new StrategicTwoPhaseCompactor(...);
+ * compactor.compact(topic, strategy);
+ *
+ * // Run table view by the compaction strategy.
+ * // While updating messages in the table view <key,value> map,
+ * //   it will choose messages only if TopicCompactionStrategy.valid(prev, 
cur) returns true.
+ * TableView tableView = 
pulsar.getClient().newTableViewBuilder(strategy.getSchema())
+ *                 .topic(topic)
+ *                 .loadConf(Map.of(
+ *                         "topicCompactionStrategy", 
strategy.getClass().getCanonicalName()))
+ *                 .create();
+ */
+public interface TopicCompactionStrategy<T> {
+
+    /**
+     * Returns the schema object for this strategy.
+     * @return
+     */
+    Schema<T> getSchema();
+    /**
+     * Tests if the current message is valid compared to the previous message 
for the same key.
+     *
+     * @param prev previous message
+     * @param cur current message
+     * @return True if the prev to the cur message transition is valid. 
Otherwise, false.
+     */
+    boolean isValid(T prev, T cur);
+
+
+    static TopicCompactionStrategy load(String topicCompactionStrategy) {

Review Comment:
   ```suggestion
       static <T> TopicCompactionStrategy<T> load(String 
topicCompactionStrategy) {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ *  An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ *  The compaction consumer subscription is durable and consumes compacted 
messages from the earliest position.
+ *  It does not acknowledge the message after each read. (needs to call 
acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+    ConsumerBase<T> consumer;
+
+    ReaderConfigurationData<T> readerConfiguration;
+    private CompactionReaderImpl(PulsarClientImpl client, 
ReaderConfigurationData<T> readerConfiguration,
+                                 ExecutorProvider executorProvider, 
CompletableFuture<Consumer<T>> consumerFuture,
+                                 Schema<T> schema) {
+        super(client, readerConfiguration, executorProvider, consumerFuture, 
schema);
+        this.readerConfiguration = readerConfiguration;
+        this.consumer = getConsumer();
+    }
+
+    public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, 
Schema<T> schema, String topic,
+                                                     
CompletableFuture<Consumer<T>> consumerFuture,
+                                                     CryptoKeyReader 
cryptoKeyReader) {
+        ReaderConfigurationData conf = new ReaderConfigurationData<>();

Review Comment:
   Add generics.
   ```suggestion
           ReaderConfigurationData<T> conf = new ReaderConfigurationData<>();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ *  An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ *  The compaction consumer subscription is durable and consumes compacted 
messages from the earliest position.
+ *  It does not acknowledge the message after each read. (needs to call 
acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+    ConsumerBase<T> consumer;
+
+    ReaderConfigurationData<T> readerConfiguration;
+    private CompactionReaderImpl(PulsarClientImpl client, 
ReaderConfigurationData<T> readerConfiguration,
+                                 ExecutorProvider executorProvider, 
CompletableFuture<Consumer<T>> consumerFuture,
+                                 Schema<T> schema) {
+        super(client, readerConfiguration, executorProvider, consumerFuture, 
schema);
+        this.readerConfiguration = readerConfiguration;
+        this.consumer = getConsumer();
+    }
+
+    public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, 
Schema<T> schema, String topic,
+                                                     
CompletableFuture<Consumer<T>> consumerFuture,
+                                                     CryptoKeyReader 
cryptoKeyReader) {
+        ReaderConfigurationData conf = new ReaderConfigurationData<>();
+        conf.setTopicName(topic);
+        conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+        conf.setStartMessageId(MessageId.earliest);
+        conf.setStartMessageFromRollbackDurationInSec(0);
+        conf.setReadCompacted(true);
+        conf.setSubscriptionMode(SubscriptionMode.Durable);
+        
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return new CompactionReaderImpl(client, conf, 
client.externalExecutorProvider(), consumerFuture, schema);
+    }
+
+
+    @Override
+    public Message<T> readNext() throws PulsarClientException {
+        return consumer.receive();
+    }
+
+    @Override
+    public Message<T> readNext(int timeout, TimeUnit unit) throws 
PulsarClientException {
+        return consumer.receive(timeout, unit);
+    }
+
+    @Override
+    public CompletableFuture<Message<T>> readNextAsync() {
+        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
+        return receiveFuture;

Review Comment:
   ```suggestion
           return consumer.receiveAsync();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ *  An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ *  The compaction consumer subscription is durable and consumes compacted 
messages from the earliest position.
+ *  It does not acknowledge the message after each read. (needs to call 
acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+    ConsumerBase<T> consumer;
+
+    ReaderConfigurationData<T> readerConfiguration;
+    private CompactionReaderImpl(PulsarClientImpl client, 
ReaderConfigurationData<T> readerConfiguration,
+                                 ExecutorProvider executorProvider, 
CompletableFuture<Consumer<T>> consumerFuture,
+                                 Schema<T> schema) {
+        super(client, readerConfiguration, executorProvider, consumerFuture, 
schema);
+        this.readerConfiguration = readerConfiguration;
+        this.consumer = getConsumer();
+    }
+
+    public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, 
Schema<T> schema, String topic,
+                                                     
CompletableFuture<Consumer<T>> consumerFuture,
+                                                     CryptoKeyReader 
cryptoKeyReader) {
+        ReaderConfigurationData conf = new ReaderConfigurationData<>();
+        conf.setTopicName(topic);
+        conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+        conf.setStartMessageId(MessageId.earliest);
+        conf.setStartMessageFromRollbackDurationInSec(0);
+        conf.setReadCompacted(true);
+        conf.setSubscriptionMode(SubscriptionMode.Durable);
+        
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return new CompactionReaderImpl(client, conf, 
client.externalExecutorProvider(), consumerFuture, schema);
+    }
+
+
+    @Override
+    public Message<T> readNext() throws PulsarClientException {
+        return consumer.receive();
+    }
+
+    @Override
+    public Message<T> readNext(int timeout, TimeUnit unit) throws 
PulsarClientException {
+        return consumer.receive(timeout, unit);
+    }
+
+    @Override
+    public CompletableFuture<Message<T>> readNextAsync() {
+        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
+        return receiveFuture;
+    }
+
+    public CompletableFuture<MessageId> getLastMessageIdAsync() {
+        return consumer.getLastMessageIdAsync();
+    }
+
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId 
messageId, Map<String, Long> properties) {
+        return consumer.doAcknowledgeWithTxn(messageId, 
CommandAck.AckType.Cumulative, properties, null);

Review Comment:
   Why do we use `doAcknowledgeWithTxn`? Can we use `doAcknowledge` instead?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class CompactionReaderImplTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+        admin.clusters().createCluster("test",
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("my-property",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", 
Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        String topic = "persistent://my-property/my-ns/my-compact-topic";
+        int numKeys = 5;
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        for (int i = 0; i < numKeys; i++) {
+            producer.newMessage().key("key:" + i).value("value" + i).send();
+        }
+
+        @Cleanup
+        CompactionReaderImpl<String> reader = CompactionReaderImpl
+                .create((PulsarClientImpl) pulsarClient, Schema.STRING, topic, 
new CompletableFuture(), null);
+
+        ConsumerBase consumerBase = spy(reader.getConsumer());
+        org.apache.commons.lang3.reflect.FieldUtils.writeDeclaredField(

Review Comment:
   nit: Remove the full package name.



##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each 
key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+    private static final Logger log = 
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+    private static final int MAX_OUTSTANDING = 500;
+    private final Duration phaseOneLoopReadTimeout;
+    private final RawBatchMessageContainerImpl batchMessageContainer;
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler,
+                                      int maxNumMessagesInBatch) {
+        super(conf, pulsar, bk, scheduler);
+        batchMessageContainer = new 
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+        phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+    }
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler) {
+        this(conf, pulsar, bk, scheduler, -1);
+    }
+
+    public CompletableFuture<Long> compact(String topic) {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> 
strategy) {
+        return compact(topic, strategy, null);
+    }
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> 
strategy,
+                                               CryptoKeyReader 
cryptoKeyReader) {
+        CompletableFuture<Consumer<T>> consumerFuture = new 
CompletableFuture<>();
+        if (cryptoKeyReader != null) {
+            batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+        }
+        CompactionReaderImpl reader = CompactionReaderImpl.create(
+                (PulsarClientImpl) pulsar, strategy.getSchema(), topic, 
consumerFuture, cryptoKeyReader);
+
+        return consumerFuture.thenComposeAsync(__ -> 
compactAndCloseReader(reader, strategy), scheduler);
+    }
+
+    <T> CompletableFuture<Long> doCompaction(Reader<T> reader, 
TopicCompactionStrategy strategy) {
+
+        if (!(reader instanceof CompactionReaderImpl<T>)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("reader has to be 
DelayedAckReaderImpl"));
+        }
+        return reader.hasMessageAvailableAsync()
+                .thenCompose(available -> {
+                    if (available) {
+                        return phaseOne(reader, strategy)
+                                .thenCompose((result) -> phaseTwo(result, 
reader, bk));
+                    } else {
+                        log.info("Skip compaction of the empty topic {}", 
reader.getTopic());
+                        return CompletableFuture.completedFuture(-1L);
+                    }
+                });
+    }
+
+    <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, 
TopicCompactionStrategy strategy) {
+        CompletableFuture<Long> promise = new CompletableFuture<>();
+        mxBean.addCompactionStartOp(reader.getTopic());
+        doCompaction(reader, strategy).whenComplete(
+                (ledgerId, exception) -> {
+                    log.info("Completed doCompaction ledgerId:{}", ledgerId);
+                    reader.closeAsync().whenComplete((v, exception2) -> {
+                        if (exception2 != null) {
+                            log.warn("Error closing reader handle {}, 
ignoring", reader, exception2);
+                        }
+                        if (exception != null) {
+                            // complete with original exception
+                            mxBean.addCompactionEndOp(reader.getTopic(), 
false);
+                            promise.completeExceptionally(exception);
+                        } else {
+                            mxBean.addCompactionEndOp(reader.getTopic(), true);
+                            promise.complete(ledgerId);
+                        }
+                    });
+                });
+        return promise;
+    }
+
+    private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T> 
result, TopicCompactionStrategy strategy) {
+        Map<String, Message<T>> cache = result.cache;
+        String key = msg.getKey();
+
+        if (key == null) {
+            msg.release();
+            return true;
+        }
+        T val = msg.getValue();
+        Message<T> prev = cache.get(key);
+        T prevVal = prev == null ? null : prev.getValue();
+
+        if (strategy.isValid(prevVal, val)) {
+            if (val != null && msg.size() > 0) {
+                cache.remove(key); // to reorder
+                cache.put(key, msg);
+            } else {
+                cache.remove(key);
+                msg.release();
+            }
+
+            if (prev != null) {
+                prev.release();
+            }
+
+            result.validCompactionCount.incrementAndGet();
+            return true;
+        } else {
+            msg.release();
+            result.invalidCompactionCount.incrementAndGet();
+            return false;
+        }
+
+    }
+
+    private static class PhaseOneResult<T> {
+        MessageId firstId;
+        //MessageId to; // last undeleted messageId

Review Comment:
   This field is not used?



##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ *  An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ *  The compaction consumer subscription is durable and consumes compacted 
messages from the earliest position.
+ *  It does not acknowledge the message after each read. (needs to call 
acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+    ConsumerBase<T> consumer;
+
+    ReaderConfigurationData<T> readerConfiguration;
+    private CompactionReaderImpl(PulsarClientImpl client, 
ReaderConfigurationData<T> readerConfiguration,
+                                 ExecutorProvider executorProvider, 
CompletableFuture<Consumer<T>> consumerFuture,
+                                 Schema<T> schema) {
+        super(client, readerConfiguration, executorProvider, consumerFuture, 
schema);
+        this.readerConfiguration = readerConfiguration;
+        this.consumer = getConsumer();
+    }
+
+    public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, 
Schema<T> schema, String topic,
+                                                     
CompletableFuture<Consumer<T>> consumerFuture,
+                                                     CryptoKeyReader 
cryptoKeyReader) {
+        ReaderConfigurationData conf = new ReaderConfigurationData<>();
+        conf.setTopicName(topic);
+        conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+        conf.setStartMessageId(MessageId.earliest);
+        conf.setStartMessageFromRollbackDurationInSec(0);
+        conf.setReadCompacted(true);
+        conf.setSubscriptionMode(SubscriptionMode.Durable);
+        
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return new CompactionReaderImpl(client, conf, 
client.externalExecutorProvider(), consumerFuture, schema);

Review Comment:
   ```suggestion
           return new CompactionReaderImpl<>(client, conf, 
client.externalExecutorProvider(), consumerFuture, schema);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to