[GitHub] merlimat commented on a change in pull request #1044: Compact algo

2018-01-29 Thread GitBox
merlimat commented on a change in pull request #1044: Compact algo
URL: https://github.com/apache/incubator-pulsar/pull/1044#discussion_r164508229
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 ##
 @@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.ByteBuf;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.api.RawMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects latest offset for each key in the topic. Then the second pass
+ * writes these values to a ledger.
+ *
+ * The two passes are required to avoid holding the payloads of each of
+ * the latest values in memory, as the payload can be many orders of
+ * magnitude larger than a message id.
+*/
+public class TwoPhaseCompactor extends Compactor {
+private static final Logger log = LoggerFactory.getLogger(Compactor.class);
+private static final int MAX_OUTSTANDING = 500;
+private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = 
"CompactedTopicLedger";
+
+public TwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+super(conf, pulsar, bk, scheduler);
+}
+
+@Override
+protected CompletableFuture doCompaction(RawReader reader, 
BookKeeper bk) {
+return phaseOne(reader).thenCompose(
+(r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+}
+
+private CompletableFuture phaseOne(RawReader reader) {
+Map latestForKey = new HashMap<>();
+
+CompletableFuture loopPromise = new 
CompletableFuture<>();
+phaseOneLoop(reader, Optional.empty(), Optional.empty(), latestForKey, 
loopPromise);
+return loopPromise;
+}
+
+private void phaseOneLoop(RawReader reader,
+  Optional firstMessageId,
+  Optional lastMessageId,
+  Map latestForKey,
+  CompletableFuture loopPromise) {
+if (loopPromise.isDone()) {
+return;
+}
+CompletableFuture future = reader.readNextAsync();
+scheduleTimeout(future);
+future.whenComplete(
+(m, exception) -> {
+try {
+if (exception != null) {
+if (exception instanceof TimeoutException
+&& firstMessageId.isPresent()) {
+loopPromise.complete(new 
PhaseOneResult(firstMessageId.get(),
+
lastMessageId.get(),
+
latestForKey));
+} else {
+loopPromise.completeExceptionally(exception);
+}
+return;
+}
+
+MessageId id = m.getMessageId();
+String key = extractKey(m);

[GitHub] merlimat commented on a change in pull request #1044: Compact algo

2018-01-25 Thread GitBox
merlimat commented on a change in pull request #1044: Compact algo
URL: https://github.com/apache/incubator-pulsar/pull/1044#discussion_r164042720
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 ##
 @@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.ByteBuf;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.api.RawMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects latest offset for each key in the topic. Then the second pass
+ * writes these values to a ledger.
+ *
+ * The two passes are required to avoid holding the payloads of each of
+ * the latest values in memory, as the payload can be many orders of
+ * magnitude larger than a message id.
+*/
+public class TwoPhaseCompactor extends Compactor {
+private static final Logger log = LoggerFactory.getLogger(Compactor.class);
+private static final int MAX_OUTSTANDING = 500;
+private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = 
"CompactedTopicLedger";
+
+public TwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+super(conf, pulsar, bk, scheduler);
+}
+
+@Override
+protected CompletableFuture doCompaction(RawReader reader, 
BookKeeper bk) {
+return phaseOne(reader).thenCompose(
+(r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+}
+
+private CompletableFuture phaseOne(RawReader reader) {
+Map latestForKey = new HashMap<>();
+
+CompletableFuture loopPromise = new 
CompletableFuture<>();
+phaseOneLoop(reader, Optional.empty(), Optional.empty(), latestForKey, 
loopPromise);
+return loopPromise;
+}
+
+private void phaseOneLoop(RawReader reader,
+  Optional firstMessageId,
+  Optional lastMessageId,
+  Map latestForKey,
+  CompletableFuture loopPromise) {
+if (loopPromise.isDone()) {
+return;
+}
+CompletableFuture future = reader.readNextAsync();
+scheduleTimeout(future);
+future.whenComplete(
+(m, exception) -> {
+try {
+if (exception != null) {
+if (exception instanceof TimeoutException
+&& firstMessageId.isPresent()) {
+loopPromise.complete(new 
PhaseOneResult(firstMessageId.get(),
+
lastMessageId.get(),
+
latestForKey));
+} else {
+loopPromise.completeExceptionally(exception);
+}
+return;
+}
+
+MessageId id = m.getMessageId();
+String key = extractKey(m);