heesung-sn commented on code in PR #18195: URL: https://github.com/apache/pulsar/pull/18195#discussion_r1014537948
########## pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.CommandAck; + +/** + * An extended ReaderImpl used for StrategicTwoPhaseCompactor. + * The compaction consumer subscription is durable and consumes compacted messages from the earliest position. + * It does not acknowledge the message after each read. (needs to call acknowledgeCumulativeAsync to ack messages.) + */ +@Slf4j +public class CompactionReaderImpl<T> extends ReaderImpl<T> { + + ConsumerBase<T> consumer; + + ReaderConfigurationData<T> readerConfiguration; + private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration, + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture, + Schema<T> schema) { + super(client, readerConfiguration, executorProvider, consumerFuture, schema); + this.readerConfiguration = readerConfiguration; + this.consumer = getConsumer(); + } + + public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic, + CompletableFuture<Consumer<T>> consumerFuture, + CryptoKeyReader cryptoKeyReader) { + ReaderConfigurationData conf = new ReaderConfigurationData<>(); Review Comment: updated. ########## pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.CommandAck; + +/** + * An extended ReaderImpl used for StrategicTwoPhaseCompactor. + * The compaction consumer subscription is durable and consumes compacted messages from the earliest position. + * It does not acknowledge the message after each read. (needs to call acknowledgeCumulativeAsync to ack messages.) + */ +@Slf4j +public class CompactionReaderImpl<T> extends ReaderImpl<T> { + + ConsumerBase<T> consumer; + + ReaderConfigurationData<T> readerConfiguration; + private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration, + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture, + Schema<T> schema) { + super(client, readerConfiguration, executorProvider, consumerFuture, schema); + this.readerConfiguration = readerConfiguration; + this.consumer = getConsumer(); + } + + public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic, + CompletableFuture<Consumer<T>> consumerFuture, + CryptoKeyReader cryptoKeyReader) { + ReaderConfigurationData conf = new ReaderConfigurationData<>(); + conf.setTopicName(topic); + conf.setSubscriptionName(COMPACTION_SUBSCRIPTION); + conf.setStartMessageId(MessageId.earliest); + conf.setStartMessageFromRollbackDurationInSec(0); + conf.setReadCompacted(true); + conf.setSubscriptionMode(SubscriptionMode.Durable); + conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + conf.setCryptoKeyReader(cryptoKeyReader); + return new CompactionReaderImpl(client, conf, client.externalExecutorProvider(), consumerFuture, schema); Review Comment: updated. ########## pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.CommandAck; + +/** + * An extended ReaderImpl used for StrategicTwoPhaseCompactor. + * The compaction consumer subscription is durable and consumes compacted messages from the earliest position. + * It does not acknowledge the message after each read. (needs to call acknowledgeCumulativeAsync to ack messages.) + */ +@Slf4j +public class CompactionReaderImpl<T> extends ReaderImpl<T> { + + ConsumerBase<T> consumer; + + ReaderConfigurationData<T> readerConfiguration; + private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration, + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture, + Schema<T> schema) { + super(client, readerConfiguration, executorProvider, consumerFuture, schema); + this.readerConfiguration = readerConfiguration; + this.consumer = getConsumer(); + } + + public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic, + CompletableFuture<Consumer<T>> consumerFuture, + CryptoKeyReader cryptoKeyReader) { + ReaderConfigurationData conf = new ReaderConfigurationData<>(); + conf.setTopicName(topic); + conf.setSubscriptionName(COMPACTION_SUBSCRIPTION); + conf.setStartMessageId(MessageId.earliest); + conf.setStartMessageFromRollbackDurationInSec(0); + conf.setReadCompacted(true); + conf.setSubscriptionMode(SubscriptionMode.Durable); + conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + conf.setCryptoKeyReader(cryptoKeyReader); + return new CompactionReaderImpl(client, conf, client.externalExecutorProvider(), consumerFuture, schema); + } + + + @Override + public Message<T> readNext() throws PulsarClientException { + return consumer.receive(); + } + + @Override + public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException { + return consumer.receive(timeout, unit); + } + + @Override + public CompletableFuture<Message<T>> readNextAsync() { + CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync(); + return receiveFuture; Review Comment: updated. ########## pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java: ########## @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import io.netty.buffer.ByteBuf; +import java.time.Duration; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.CompactionReaderImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; +import org.apache.pulsar.common.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compaction will go through the topic in two passes. The first pass + * selects valid message(defined in the TopicCompactionStrategy.isValid()) + * for each key in the topic. Then, the second pass writes these values + * to a ledger. + * + * <p>As the first pass caches the entire message(not just offset) for each key into a map, + * this compaction could be memory intensive if the message payload is large. + */ +public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor { + private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class); + private static final int MAX_OUTSTANDING = 500; + private final Duration phaseOneLoopReadTimeout; + private final RawBatchMessageContainerImpl batchMessageContainer; + + public StrategicTwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler, + int maxNumMessagesInBatch) { + super(conf, pulsar, bk, scheduler); + batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch); + phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); + } + + public StrategicTwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler) { + this(conf, pulsar, bk, scheduler, -1); + } + + public CompletableFuture<Long> compact(String topic) { + throw new UnsupportedOperationException(); + } + + + public <T> CompletableFuture<Long> compact(String topic, + TopicCompactionStrategy<T> strategy) { + return compact(topic, strategy, null); + } + + public <T> CompletableFuture<Long> compact(String topic, + TopicCompactionStrategy<T> strategy, + CryptoKeyReader cryptoKeyReader) { + CompletableFuture<Consumer<T>> consumerFuture = new CompletableFuture<>(); + if (cryptoKeyReader != null) { + batchMessageContainer.setCryptoKeyReader(cryptoKeyReader); + } + CompactionReaderImpl reader = CompactionReaderImpl.create( + (PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture, cryptoKeyReader); + + return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler); + } + + <T> CompletableFuture<Long> doCompaction(Reader<T> reader, TopicCompactionStrategy strategy) { + + if (!(reader instanceof CompactionReaderImpl<T>)) { + return CompletableFuture.failedFuture( + new IllegalStateException("reader has to be DelayedAckReaderImpl")); + } + return reader.hasMessageAvailableAsync() + .thenCompose(available -> { + if (available) { + return phaseOne(reader, strategy) + .thenCompose((result) -> phaseTwo(result, reader, bk)); + } else { + log.info("Skip compaction of the empty topic {}", reader.getTopic()); + return CompletableFuture.completedFuture(-1L); + } + }); + } + + <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, TopicCompactionStrategy strategy) { + CompletableFuture<Long> promise = new CompletableFuture<>(); + mxBean.addCompactionStartOp(reader.getTopic()); + doCompaction(reader, strategy).whenComplete( + (ledgerId, exception) -> { + log.info("Completed doCompaction ledgerId:{}", ledgerId); + reader.closeAsync().whenComplete((v, exception2) -> { + if (exception2 != null) { + log.warn("Error closing reader handle {}, ignoring", reader, exception2); + } + if (exception != null) { + // complete with original exception + mxBean.addCompactionEndOp(reader.getTopic(), false); + promise.completeExceptionally(exception); + } else { + mxBean.addCompactionEndOp(reader.getTopic(), true); + promise.complete(ledgerId); + } + }); + }); + return promise; + } + + private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T> result, TopicCompactionStrategy strategy) { + Map<String, Message<T>> cache = result.cache; + String key = msg.getKey(); + + if (key == null) { + msg.release(); + return true; + } + T val = msg.getValue(); + Message<T> prev = cache.get(key); + T prevVal = prev == null ? null : prev.getValue(); + + if (strategy.isValid(prevVal, val)) { + if (val != null && msg.size() > 0) { + cache.remove(key); // to reorder + cache.put(key, msg); + } else { + cache.remove(key); + msg.release(); + } + + if (prev != null) { + prev.release(); + } + + result.validCompactionCount.incrementAndGet(); + return true; + } else { + msg.release(); + result.invalidCompactionCount.incrementAndGet(); + return false; + } + + } + + private static class PhaseOneResult<T> { + MessageId firstId; + //MessageId to; // last undeleted messageId Review Comment: not used. removed. ########## pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java: ########## @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import io.netty.buffer.ByteBuf; +import java.time.Duration; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.CompactionReaderImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; +import org.apache.pulsar.common.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compaction will go through the topic in two passes. The first pass + * selects valid message(defined in the TopicCompactionStrategy.isValid()) + * for each key in the topic. Then, the second pass writes these values + * to a ledger. + * + * <p>As the first pass caches the entire message(not just offset) for each key into a map, + * this compaction could be memory intensive if the message payload is large. + */ +public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor { + private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class); + private static final int MAX_OUTSTANDING = 500; + private final Duration phaseOneLoopReadTimeout; + private final RawBatchMessageContainerImpl batchMessageContainer; + + public StrategicTwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler, + int maxNumMessagesInBatch) { + super(conf, pulsar, bk, scheduler); + batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch); + phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); + } + + public StrategicTwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler) { + this(conf, pulsar, bk, scheduler, -1); + } + + public CompletableFuture<Long> compact(String topic) { + throw new UnsupportedOperationException(); + } + + + public <T> CompletableFuture<Long> compact(String topic, + TopicCompactionStrategy<T> strategy) { + return compact(topic, strategy, null); + } + + public <T> CompletableFuture<Long> compact(String topic, + TopicCompactionStrategy<T> strategy, + CryptoKeyReader cryptoKeyReader) { + CompletableFuture<Consumer<T>> consumerFuture = new CompletableFuture<>(); + if (cryptoKeyReader != null) { + batchMessageContainer.setCryptoKeyReader(cryptoKeyReader); + } + CompactionReaderImpl reader = CompactionReaderImpl.create( + (PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture, cryptoKeyReader); + + return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler); + } + + <T> CompletableFuture<Long> doCompaction(Reader<T> reader, TopicCompactionStrategy strategy) { + + if (!(reader instanceof CompactionReaderImpl<T>)) { + return CompletableFuture.failedFuture( + new IllegalStateException("reader has to be DelayedAckReaderImpl")); + } + return reader.hasMessageAvailableAsync() + .thenCompose(available -> { + if (available) { + return phaseOne(reader, strategy) + .thenCompose((result) -> phaseTwo(result, reader, bk)); + } else { + log.info("Skip compaction of the empty topic {}", reader.getTopic()); + return CompletableFuture.completedFuture(-1L); + } + }); + } + + <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, TopicCompactionStrategy strategy) { + CompletableFuture<Long> promise = new CompletableFuture<>(); + mxBean.addCompactionStartOp(reader.getTopic()); + doCompaction(reader, strategy).whenComplete( + (ledgerId, exception) -> { + log.info("Completed doCompaction ledgerId:{}", ledgerId); + reader.closeAsync().whenComplete((v, exception2) -> { + if (exception2 != null) { + log.warn("Error closing reader handle {}, ignoring", reader, exception2); + } + if (exception != null) { + // complete with original exception + mxBean.addCompactionEndOp(reader.getTopic(), false); + promise.completeExceptionally(exception); + } else { + mxBean.addCompactionEndOp(reader.getTopic(), true); + promise.complete(ledgerId); + } + }); + }); + return promise; + } + + private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T> result, TopicCompactionStrategy strategy) { Review Comment: updated. ########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java: ########## @@ -164,8 +164,8 @@ public Message<T> readNext() throws PulsarClientException { @Override public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException { - Message<T> msg = consumer.receive(timeout, unit); + Message<T> msg = consumer.receive(timeout, unit); Review Comment: removed. ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import com.google.common.collect.Sets; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class CompactionReaderImplTest extends MockedPulsarServiceBaseTest { + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + admin.clusters().createCluster("test", + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("my-property", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test")); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void test() throws Exception { + + String topic = "persistent://my-property/my-ns/my-compact-topic"; + int numKeys = 5; + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + for (int i = 0; i < numKeys; i++) { + producer.newMessage().key("key:" + i).value("value" + i).send(); + } + + @Cleanup + CompactionReaderImpl<String> reader = CompactionReaderImpl + .create((PulsarClientImpl) pulsarClient, Schema.STRING, topic, new CompletableFuture(), null); + + ConsumerBase consumerBase = spy(reader.getConsumer()); + org.apache.commons.lang3.reflect.FieldUtils.writeDeclaredField( Review Comment: removed. ########## pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java: ########## @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import io.netty.buffer.ByteBuf; +import java.time.Duration; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.CompactionReaderImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; +import org.apache.pulsar.common.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compaction will go through the topic in two passes. The first pass + * selects valid message(defined in the TopicCompactionStrategy.isValid()) + * for each key in the topic. Then, the second pass writes these values + * to a ledger. + * + * <p>As the first pass caches the entire message(not just offset) for each key into a map, + * this compaction could be memory intensive if the message payload is large. + */ +public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor { + private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class); + private static final int MAX_OUTSTANDING = 500; + private final Duration phaseOneLoopReadTimeout; + private final RawBatchMessageContainerImpl batchMessageContainer; + + public StrategicTwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler, + int maxNumMessagesInBatch) { + super(conf, pulsar, bk, scheduler); + batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch); + phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); + } + + public StrategicTwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler) { + this(conf, pulsar, bk, scheduler, -1); + } + + public CompletableFuture<Long> compact(String topic) { + throw new UnsupportedOperationException(); + } + + + public <T> CompletableFuture<Long> compact(String topic, + TopicCompactionStrategy<T> strategy) { + return compact(topic, strategy, null); + } + + public <T> CompletableFuture<Long> compact(String topic, + TopicCompactionStrategy<T> strategy, + CryptoKeyReader cryptoKeyReader) { + CompletableFuture<Consumer<T>> consumerFuture = new CompletableFuture<>(); + if (cryptoKeyReader != null) { + batchMessageContainer.setCryptoKeyReader(cryptoKeyReader); + } + CompactionReaderImpl reader = CompactionReaderImpl.create( + (PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture, cryptoKeyReader); + + return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler); + } + + <T> CompletableFuture<Long> doCompaction(Reader<T> reader, TopicCompactionStrategy strategy) { + + if (!(reader instanceof CompactionReaderImpl<T>)) { + return CompletableFuture.failedFuture( + new IllegalStateException("reader has to be DelayedAckReaderImpl")); + } + return reader.hasMessageAvailableAsync() + .thenCompose(available -> { + if (available) { + return phaseOne(reader, strategy) + .thenCompose((result) -> phaseTwo(result, reader, bk)); + } else { + log.info("Skip compaction of the empty topic {}", reader.getTopic()); + return CompletableFuture.completedFuture(-1L); + } + }); + } + + <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, TopicCompactionStrategy strategy) { + CompletableFuture<Long> promise = new CompletableFuture<>(); + mxBean.addCompactionStartOp(reader.getTopic()); + doCompaction(reader, strategy).whenComplete( + (ledgerId, exception) -> { + log.info("Completed doCompaction ledgerId:{}", ledgerId); + reader.closeAsync().whenComplete((v, exception2) -> { + if (exception2 != null) { + log.warn("Error closing reader handle {}, ignoring", reader, exception2); + } + if (exception != null) { + // complete with original exception + mxBean.addCompactionEndOp(reader.getTopic(), false); + promise.completeExceptionally(exception); + } else { + mxBean.addCompactionEndOp(reader.getTopic(), true); + promise.complete(ledgerId); + } + }); + }); + return promise; + } + + private <T> boolean doCompactMessage(Message<T> msg, PhaseOneResult<T> result, TopicCompactionStrategy strategy) { + Map<String, Message<T>> cache = result.cache; + String key = msg.getKey(); + + if (key == null) { + msg.release(); + return true; + } + T val = msg.getValue(); + Message<T> prev = cache.get(key); + T prevVal = prev == null ? null : prev.getValue(); + + if (strategy.isValid(prevVal, val)) { + if (val != null && msg.size() > 0) { + cache.remove(key); // to reorder + cache.put(key, msg); + } else { + cache.remove(key); + msg.release(); + } + + if (prev != null) { + prev.release(); + } + + result.validCompactionCount.incrementAndGet(); + return true; + } else { + msg.release(); + result.invalidCompactionCount.incrementAndGet(); + return false; + } + + } + + private static class PhaseOneResult<T> { + MessageId firstId; + //MessageId to; // last undeleted messageId + MessageId lastId; // last read messageId + Map<String, Message<T>> cache; + + AtomicInteger invalidCompactionCount; + + AtomicInteger validCompactionCount; + + AtomicInteger numReadMessages; + + String topic; + + PhaseOneResult(String topic) { + this.topic = topic; + cache = new LinkedHashMap<>(); + invalidCompactionCount = new AtomicInteger(); + validCompactionCount = new AtomicInteger(); + numReadMessages = new AtomicInteger(); + } + + @Override + public String toString() { + return String.format( + "{Topic:%s, firstId:%s, lastId:%s, cache.size:%d, " + + "invalidCompactionCount:%d, validCompactionCount:%d, numReadMessages:%d}", + topic, + firstId != null ? firstId.toString() : "", + lastId != null ? lastId.toString() : "", + cache.size(), + invalidCompactionCount.get(), + validCompactionCount.get(), + numReadMessages.get()); + } + } + + + private <T> CompletableFuture<PhaseOneResult> phaseOne(Reader<T> reader, TopicCompactionStrategy strategy) { + CompletableFuture<PhaseOneResult> promise = new CompletableFuture<>(); + PhaseOneResult<T> result = new PhaseOneResult(reader.getTopic()); + + ((CompactionReaderImpl<T>) reader).getLastMessageIdAsync() + .thenAccept(lastMessageId -> { + log.info("Commencing phase one of compaction for {}, reading to {}", + reader.getTopic(), lastMessageId); + result.lastId = copyMessageId(lastMessageId); + phaseOneLoop(reader, promise, result, strategy); + }).exceptionally(ex -> { + promise.completeExceptionally(ex); + return null; + }); + + return promise; + + } + + private static MessageId copyMessageId(MessageId msgId) { + if (msgId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl tempId = (BatchMessageIdImpl) msgId; + return new BatchMessageIdImpl(tempId); + } else if (msgId instanceof MessageIdImpl) { + MessageIdImpl tempId = (MessageIdImpl) msgId; + return new MessageIdImpl(tempId.getLedgerId(), tempId.getEntryId(), + tempId.getPartitionIndex()); + } else { + throw new IllegalStateException("Unknown lastMessageId type"); + } + } + + private <T> void phaseOneLoop(Reader<T> reader, CompletableFuture<PhaseOneResult> promise, + PhaseOneResult<T> result, TopicCompactionStrategy strategy) { Review Comment: updated ########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java: ########## @@ -51,13 +52,15 @@ private final List<BiConsumer<String, T>> listeners; private final ReentrantLock listenersMutex; + private TopicCompactionStrategy<T> compactionStrategy; TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) { this.conf = conf; this.data = new ConcurrentHashMap<>(); this.immutableData = Collections.unmodifiableMap(data); this.listeners = new ArrayList<>(); this.listenersMutex = new ReentrantLock(); + this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategy()); Review Comment: Yes. It is covered in TopicCompactionStrategyTest and StrategicCompactionTest.testNumericOrderCompaction(). ########## pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.CommandAck; + +/** + * An extended ReaderImpl used for StrategicTwoPhaseCompactor. + * The compaction consumer subscription is durable and consumes compacted messages from the earliest position. + * It does not acknowledge the message after each read. (needs to call acknowledgeCumulativeAsync to ack messages.) + */ +@Slf4j +public class CompactionReaderImpl<T> extends ReaderImpl<T> { + + ConsumerBase<T> consumer; + + ReaderConfigurationData<T> readerConfiguration; + private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration, + ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture, + Schema<T> schema) { + super(client, readerConfiguration, executorProvider, consumerFuture, schema); + this.readerConfiguration = readerConfiguration; + this.consumer = getConsumer(); + } + + public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic, + CompletableFuture<Consumer<T>> consumerFuture, + CryptoKeyReader cryptoKeyReader) { + ReaderConfigurationData conf = new ReaderConfigurationData<>(); + conf.setTopicName(topic); + conf.setSubscriptionName(COMPACTION_SUBSCRIPTION); + conf.setStartMessageId(MessageId.earliest); + conf.setStartMessageFromRollbackDurationInSec(0); + conf.setReadCompacted(true); + conf.setSubscriptionMode(SubscriptionMode.Durable); + conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + conf.setCryptoKeyReader(cryptoKeyReader); + return new CompactionReaderImpl(client, conf, client.externalExecutorProvider(), consumerFuture, schema); + } + + + @Override + public Message<T> readNext() throws PulsarClientException { + return consumer.receive(); + } + + @Override + public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException { + return consumer.receive(timeout, unit); + } + + @Override + public CompletableFuture<Message<T>> readNextAsync() { + CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync(); + return receiveFuture; + } + + public CompletableFuture<MessageId> getLastMessageIdAsync() { + return consumer.getLastMessageIdAsync(); + } + + public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) { + return consumer.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, properties, null); Review Comment: No good reason. Replaced it with `doAcknowledge`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
