This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6de2652  Compaction CLI tool (#1257)
6de2652 is described below

commit 6de26521ecb952ab5e5a19a079bc773ee0dd409c
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Feb 21 20:59:26 2018 +0100

    Compaction CLI tool (#1257)
    
    * Compaction CLI tool
    
    CLI tool to run compaction on a topic. This can only be run by the
    administrator. It runs with the same configuration as the broker.
    
    * Only require broker configuration
    
    * Add some help and fix typo in bin/pulsar
---
 bin/pulsar                                         |   3 +
 .../apache/pulsar/compaction/CompactorTool.java    | 125 +++++++++++++++++++++
 2 files changed, 128 insertions(+)

diff --git a/bin/pulsar b/bin/pulsar
index 927a15d..9309d8a 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -75,6 +75,7 @@ where command is one of:
     proxy               Run a pulsar proxy
     websocket           Run a web socket proxy server
     standalone          Run a broker server with local bookies and local 
zookeeper
+    compact-topic       Run compaction against a topic
 
     initialize-cluster-metadata     One-time metadata initialization
     zookeeper-shell     Open a ZK shell client
@@ -224,6 +225,8 @@ elif [ $COMMAND == "initialize-cluster-metadata" ]; then
     exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
 elif [ $COMMAND == "zookeeper-shell" ]; then
     exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
+elif [ $COMMAND == "compact-topic" ]; then
+    exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf 
$PULSAR_BROKER_CONF $@
 elif [ $COMMAND == "help" ]; then
     pulsar_help;
 else
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
new file mode 100644
index 0000000..ac18d1f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import java.io.FileInputStream;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.PulsarService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.api.ClientConfiguration;
+import java.nio.file.Paths;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
+import org.apache.pulsar.client.api.PulsarClient;
+
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompactorTool {
+
+    private static class Arguments {
+        @Parameter(names = {"-c", "--broker-conf"}, description = 
"Configuration file for Broker")
+        private String brokerConfigFile = 
Paths.get("").toAbsolutePath().normalize().toString() + "/conf/broker.conf";
+
+        @Parameter(names = {"-t", "--topic"}, description = "Topic to 
compact", required=true)
+        private String topic;
+
+        @Parameter(names = {"-h", "--help"}, description = "Show this help 
message")
+        private boolean help = false;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Arguments arguments = new Arguments();
+        JCommander jcommander = new JCommander(arguments);
+        jcommander.setProgramName("PulsarTopicCompactor");
+
+        // parse args by JCommander
+        jcommander.parse(args);
+        if (arguments.help) {
+            jcommander.usage();
+            System.exit(-1);
+        }
+
+        // init broker config
+        ServiceConfiguration brokerConfig;
+        if (isBlank(arguments.brokerConfigFile)) {
+            jcommander.usage();
+            throw new IllegalArgumentException("Need to specify a 
configuration file for broker");
+        } else {
+            brokerConfig = PulsarConfigurationLoader.create(
+                    arguments.brokerConfigFile, ServiceConfiguration.class);
+        }
+
+        String pulsarServiceUrl = PulsarService.brokerUrl(brokerConfig);
+        ClientConfiguration clientConfig = new ClientConfiguration();
+
+        if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
+            
clientConfig.setAuthentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
+                                           
brokerConfig.getBrokerClientAuthenticationParameters());
+        }
+        clientConfig.setUseTls(brokerConfig.isTlsEnabled());
+        
clientConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
+        
clientConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
+
+        ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
+                new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
+
+        ZooKeeperClientFactory zkClientFactory = new 
ZookeeperBkClientFactoryImpl();
+
+        ZooKeeper zk = 
zkClientFactory.create(brokerConfig.getZookeeperServers(),
+                                              
ZooKeeperClientFactory.SessionType.ReadWrite,
+                                              
(int)brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
+        BookKeeperClientFactory bkClientFactory = new 
BookKeeperClientFactoryImpl();
+        BookKeeper bk = bkClientFactory.create(brokerConfig, zk);
+        try (PulsarClient pulsar = PulsarClient.create(pulsarServiceUrl, 
clientConfig)) {
+            Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, 
bk, scheduler);
+            long ledgerId = compactor.compact(arguments.topic).get();
+            log.info("Compaction of topic {} complete. Compacted to ledger 
{}", arguments.topic, ledgerId);
+        } finally {
+            bk.close();
+            bkClientFactory.close();
+            zk.close();
+            scheduler.shutdownNow();
+        }
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(CompactorTool.class);
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to