[GitHub] merlimat commented on a change in pull request #1044: Compact algo
merlimat commented on a change in pull request #1044: Compact algo URL: https://github.com/apache/incubator-pulsar/pull/1044#discussion_r164508229 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java ## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import com.google.common.collect.ImmutableMap; +import io.netty.buffer.ByteBuf; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.api.RawMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compaction will go through the topic in two passes. The first pass + * selects latest offset for each key in the topic. Then the second pass + * writes these values to a ledger. + * + * The two passes are required to avoid holding the payloads of each of + * the latest values in memory, as the payload can be many orders of + * magnitude larger than a message id. +*/ +public class TwoPhaseCompactor extends Compactor { +private static final Logger log = LoggerFactory.getLogger(Compactor.class); +private static final int MAX_OUTSTANDING = 500; +private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; + +public TwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler) { +super(conf, pulsar, bk, scheduler); +} + +@Override +protected CompletableFuture doCompaction(RawReader reader, BookKeeper bk) { +return phaseOne(reader).thenCompose( +(r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk)); +} + +private CompletableFuture phaseOne(RawReader reader) { +MaplatestForKey = new HashMap<>(); + +CompletableFuture loopPromise = new CompletableFuture<>(); +phaseOneLoop(reader, Optional.empty(), Optional.empty(), latestForKey, loopPromise); +return loopPromise; +} + +private void phaseOneLoop(RawReader reader, + Optional firstMessageId, + Optional lastMessageId, + Map latestForKey, + CompletableFuture loopPromise) { +if (loopPromise.isDone()) { +return; +} +CompletableFuture future = reader.readNextAsync(); +scheduleTimeout(future); +future.whenComplete( +(m, exception) -> { +try { +if (exception != null) { +if (exception instanceof TimeoutException +&& firstMessageId.isPresent()) { +loopPromise.complete(new PhaseOneResult(firstMessageId.get(), + lastMessageId.get(), + latestForKey)); +} else { +loopPromise.completeExceptionally(exception); +} +return; +} + +MessageId id = m.getMessageId(); +String key = extractKey(m);
[GitHub] merlimat commented on a change in pull request #1044: Compact algo
merlimat commented on a change in pull request #1044: Compact algo URL: https://github.com/apache/incubator-pulsar/pull/1044#discussion_r164042720 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java ## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import com.google.common.collect.ImmutableMap; +import io.netty.buffer.ByteBuf; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.api.RawMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compaction will go through the topic in two passes. The first pass + * selects latest offset for each key in the topic. Then the second pass + * writes these values to a ledger. + * + * The two passes are required to avoid holding the payloads of each of + * the latest values in memory, as the payload can be many orders of + * magnitude larger than a message id. +*/ +public class TwoPhaseCompactor extends Compactor { +private static final Logger log = LoggerFactory.getLogger(Compactor.class); +private static final int MAX_OUTSTANDING = 500; +private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; + +public TwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler) { +super(conf, pulsar, bk, scheduler); +} + +@Override +protected CompletableFuture doCompaction(RawReader reader, BookKeeper bk) { +return phaseOne(reader).thenCompose( +(r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk)); +} + +private CompletableFuture phaseOne(RawReader reader) { +MaplatestForKey = new HashMap<>(); + +CompletableFuture loopPromise = new CompletableFuture<>(); +phaseOneLoop(reader, Optional.empty(), Optional.empty(), latestForKey, loopPromise); +return loopPromise; +} + +private void phaseOneLoop(RawReader reader, + Optional firstMessageId, + Optional lastMessageId, + Map latestForKey, + CompletableFuture loopPromise) { +if (loopPromise.isDone()) { +return; +} +CompletableFuture future = reader.readNextAsync(); +scheduleTimeout(future); +future.whenComplete( +(m, exception) -> { +try { +if (exception != null) { +if (exception instanceof TimeoutException +&& firstMessageId.isPresent()) { +loopPromise.complete(new PhaseOneResult(firstMessageId.get(), + lastMessageId.get(), + latestForKey)); +} else { +loopPromise.completeExceptionally(exception); +} +return; +} + +MessageId id = m.getMessageId(); +String key = extractKey(m);