This is an automated email from the ASF dual-hosted git repository.

frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 41175f3ecd1 Support lz4 compression in hdfs (#18982)
41175f3ecd1 is described below

commit 41175f3ecd1d00aeaa76135dcba59705e6dd3cc1
Author: Frank Chen <[email protected]>
AuthorDate: Fri Feb 20 20:55:35 2026 +0800

    Support lz4 compression in hdfs (#18982)
    
    * Support lz4 compression for segment
    
    * Add pull metrics
    
    * Add tests
    
    * Add test
    
    Signed-off-by: Frank Chen <[email protected]>
    
    * Clean
    
    * Clean
    
    * Add test
    
    * Update doc
    
    * Fix test
    
    * Remove unused
    
    * Add doc for metrics
    
    ---------
    
    Signed-off-by: Frank Chen <[email protected]>
---
 docs/configuration/index.md                        |   1 +
 docs/operations/metrics.md                         |  11 ++
 .../druid/storage/hdfs/HdfsDataSegmentPuller.java  |  51 +++---
 .../druid/storage/hdfs/HdfsDataSegmentPusher.java  |  83 +++++++---
 .../storage/hdfs/HdfsDataSegmentPusherConfig.java  |  14 ++
 .../storage/hdfs/HdfsDataSegmentPusherTest.java    |  34 ++--
 .../storage/hdfs/HdfsStorageDruidModuleTest.java   |   3 +
 .../org/apache/druid/utils/CompressionUtils.java   | 178 ++++++++++++++++++++-
 .../java/util/common/CompressionUtilsTest.java     |  41 +++++
 9 files changed, 361 insertions(+), 55 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f6ddebcb0b2..a57d09ea9b3 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -441,6 +441,7 @@ This deep storage is used to interface with HDFS. You must 
load the `druid-hdfs-
 |Property|Description|Default|
 |--------|-----------|-------|
 |`druid.storage.storageDirectory`|HDFS directory to use as deep storage.|none|
+|`druid.storage.compressionFormat`|The compression format applied to the 
segments uploaded to HDFS. Only `zip` and `lz4` are supported. |zip|
 
 #### Cassandra deep storage
 
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 690738cbc9e..523ea15185b 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -630,3 +630,14 @@ These metrics are available on operating systems with the 
cgroup kernel feature.
 |`cgroup/cpuset/effective_cpu_count`|Total number of active CPUs available to 
the process. Derived from `cpuset.effective_cpus`.||Varies|
 |`cgroup/cpuset/mems_count`|Total number of memory nodes available to the 
process. Derived from `cpuset.mems`.||Varies|
 |`cgroup/cpuset/effective_mems_count`|Total number of active memory nodes 
available to the process. Derived from `cpuset.effective_mems`.||Varies|
+
+## HDFS
+
+These metrics are available when the `druid-hdfs-storage` extension is used to 
push/pull segment files.
+
+| Metric               | Description                                           
                        | Dimensions                                  | Normal 
value |
+|----------------------|-------------------------------------------------------------------------------|---------------------------------------------|--------------|
+| `hdfs/pull/size`     | Total bytes of decompressed segment files pulled from 
HDFS.                   | `format`: compression format (`ZIP`, `LZ4`) | Varies  
     |
+| `hdfs/pull/duration` | Time in milliseconds spent decompressing and pulling 
segment files from HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies 
      |
+| `hdfs/push/size`     | Total bytes compressed segment files pushed to HDFS.  
                        | `format`: compression format (`ZIP`, `LZ4`) | Varies  
     |
+| `hdfs/push/duration` | Time in milliseconds spent compressing and pushing 
segment files to HDFS.     | `format`: compression format (`ZIP`, `LZ4`) | 
Varies       |
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java
index 2c6de8e28bb..34a358bdd90 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java
@@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.io.NativeIO;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.loading.URIDataPuller;
 import org.apache.druid.utils.CompressionUtils;
@@ -40,6 +42,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 
+import javax.annotation.Nullable;
 import javax.tools.FileObject;
 import java.io.File;
 import java.io.IOException;
@@ -177,11 +180,21 @@ public class HdfsDataSegmentPuller implements 
URIDataPuller
 
   private static final Logger log = new Logger(HdfsDataSegmentPuller.class);
   protected final Configuration config;
+  private final ServiceEmitter emitter;
 
-  @Inject
   public HdfsDataSegmentPuller(@Hdfs final Configuration config)
+  {
+    this(config, null);
+  }
+
+  @Inject
+  public HdfsDataSegmentPuller(
+      @Hdfs final Configuration config,
+      @Nullable final ServiceEmitter emitter
+  )
   {
     this.config = config;
+    this.emitter = emitter;
   }
 
   FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) 
throws SegmentLoadingException
@@ -235,27 +248,16 @@ public class HdfsDataSegmentPuller implements 
URIDataPuller
         catch (Exception e) {
           throw new RuntimeException(e);
         }
-      } else if (CompressionUtils.isZip(path.getName())) {
+      }
 
-        // --------    zip     ---------
+      // Try to detect format from file extension and decompress
+      final CompressionUtils.Format format = 
CompressionUtils.Format.fromFileName(path.getName());
+      if ((format == CompressionUtils.Format.ZIP || format == 
CompressionUtils.Format.LZ4)) {
+        long startTime = System.currentTimeMillis();
 
-        final FileUtils.FileCopyResult result = CompressionUtils.unzip(
-            new ByteSource()
-            {
-              @Override
-              public InputStream openStream() throws IOException
-              {
-                return getInputStream(path);
-              }
-            }, outDir, shouldRetryPredicate(), false
-        );
+        final FileUtils.FileCopyResult result = 
format.decompressDirectory(getInputStream(path), outDir);
 
-        log.info(
-            "Unzipped %d bytes from [%s] to [%s]",
-            result.size(),
-            path.toString(),
-            outDir.getAbsolutePath()
-        );
+        emitMetrics(format, result.size(), System.currentTimeMillis() - 
startTime);
 
         return result;
       } else if (CompressionUtils.isGz(path.getName())) {
@@ -292,6 +294,17 @@ public class HdfsDataSegmentPuller implements URIDataPuller
     }
   }
 
+  private void emitMetrics(CompressionUtils.Format format, long size, long 
duration)
+  {
+    if (emitter == null) {
+      return;
+    }
+    ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
+    metricBuilder.setDimension("format", format);
+    emitter.emit(metricBuilder.setMetric("hdfs/pull/size", size));
+    emitter.emit(metricBuilder.setMetric("hdfs/pull/duration", duration));
+  }
+
   public InputStream getInputStream(Path path) throws IOException
   {
     return buildFileObject(path.toUri(), config).openInputStream();
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
index 6bcf96d6819..bcbff961e52 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.storage.hdfs;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
@@ -32,10 +31,11 @@ import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.CompressionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.HadoopFsWrapper;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.format.ISODateTimeFormat;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -55,20 +56,33 @@ public class HdfsDataSegmentPusher implements 
DataSegmentPusher
 {
   private static final Logger log = new Logger(HdfsDataSegmentPusher.class);
 
+  private final HdfsDataSegmentPusherConfig config;
   private final Configuration hadoopConfig;
+  @Nullable
+  private final ServiceEmitter emitter;
 
   // We lazily initialize fullQualifiedStorageDirectory to avoid potential 
issues with Hadoop namenode HA.
   // Please see https://github.com/apache/druid/pull/5684
   private final Supplier<String> fullyQualifiedStorageDirectory;
 
+  public HdfsDataSegmentPusher(
+      HdfsDataSegmentPusherConfig config,
+      @Hdfs Configuration hadoopConfig
+  )
+  {
+    this(config, hadoopConfig, null);
+  }
+
   @Inject
   public HdfsDataSegmentPusher(
       HdfsDataSegmentPusherConfig config,
       @Hdfs Configuration hadoopConfig,
-      ObjectMapper jsonMapper
+      ServiceEmitter emitter
   )
   {
+    this.config = config;
     this.hadoopConfig = hadoopConfig;
+    this.emitter = emitter;
     Path storageDir = new Path(config.getStorageDirectory());
     this.fullyQualifiedStorageDirectory = Suppliers.memoize(
         () -> {
@@ -105,28 +119,34 @@ public class HdfsDataSegmentPusher implements 
DataSegmentPusher
     // '{partitionNum}_index.zip' without unique paths and 
'{partitionNum}_{UUID}_index.zip' with unique paths.
     final String storageDir = this.getStorageDir(segment, false);
 
-
     final String uniquePrefix = useUniquePath ? 
DataSegmentPusher.generateUniquePath() + "_" : "";
-    final String outIndexFilePathSuffix = StringUtils.format(
-        "%s/%s/%d_%sindex.zip",
+    final String outIndexFilePath = StringUtils.format(
+        "%s/%s/%d_%sindex%s",
         fullyQualifiedStorageDirectory.get(),
         storageDir,
         segment.getShardSpec().getPartitionNum(),
-        uniquePrefix
+        uniquePrefix,
+        config.getCompressionFormat().getSuffix()
     );
 
-    return pushToFilePathWithRetry(inDir, segment, outIndexFilePathSuffix);
+    return pushToFilePathWithRetry(inDir, segment, outIndexFilePath);
   }
 
   @Override
   public DataSegment pushToPath(File inDir, DataSegment segment, String 
storageDirSuffix) throws IOException
   {
-    String outIndexFilePath = StringUtils.format(
-        "%s/%s/%d_index.zip",
-        fullyQualifiedStorageDirectory.get(),
-        storageDirSuffix.replace(':', '_'),
-        segment.getShardSpec().getPartitionNum()
-    );
+    final String outIndexFilePath;
+    if (storageDirSuffix.endsWith("index.zip") || 
storageDirSuffix.endsWith("index.lz4")) {
+      outIndexFilePath = StringUtils.format("%s/%s", 
fullyQualifiedStorageDirectory.get(), storageDirSuffix);
+    } else {
+      outIndexFilePath = StringUtils.format(
+          "%s/%s/%d_index%s",
+          fullyQualifiedStorageDirectory.get(),
+          storageDirSuffix.replace(':', '_'),
+          segment.getShardSpec().getPartitionNum(),
+          config.getCompressionFormat().getSuffix()
+      );
+    }
 
     return pushToFilePathWithRetry(inDir, segment, outIndexFilePath);
   }
@@ -136,11 +156,17 @@ public class HdfsDataSegmentPusher implements 
DataSegmentPusher
   {
     // Retry any HDFS errors that occur, up to 5 times.
     try {
-      return RetryUtils.retry(
+      final long startTime = System.currentTimeMillis();
+
+      final DataSegment result = RetryUtils.retry(
           () -> pushToFilePath(inDir, segment, outIndexFilePath),
           exception -> exception instanceof Exception,
           5
       );
+
+      emitMetrics(result.getSize(), System.currentTimeMillis() - startTime);
+
+      return result;
     }
     catch (Exception e) {
       Throwables.throwIfInstanceOf(e, IOException.class);
@@ -149,21 +175,32 @@ public class HdfsDataSegmentPusher implements 
DataSegmentPusher
     }
   }
 
+  private void emitMetrics(long size, long duration)
+  {
+    if (emitter == null) {
+      return;
+    }
+
+    final ServiceMetricEvent.Builder metricBuilder = 
ServiceMetricEvent.builder();
+    metricBuilder.setDimension("format", config.getCompressionFormat());
+    emitter.emit(metricBuilder.setMetric("hdfs/push/size", size));
+    emitter.emit(metricBuilder.setMetric("hdfs/push/duration", duration));
+  }
+
   private DataSegment pushToFilePath(File inDir, DataSegment segment, String 
outIndexFilePath) throws IOException
   {
     log.debug(
-        "Copying segment[%s] to HDFS at location[%s/%s]",
+        "Copying segment[%s] to HDFS at location[%s]",
         segment.getId(),
-        fullyQualifiedStorageDirectory.get(),
         outIndexFilePath
     );
-
     Path tmpIndexFile = new Path(StringUtils.format(
-        "%s/%s/%s/%s_index.zip",
+        "%s/%s/%s/%s_index%s",
         fullyQualifiedStorageDirectory.get(),
         segment.getDataSource(),
         UUIDUtils.generateUuid(),
-        segment.getShardSpec().getPartitionNum()
+        segment.getShardSpec().getPartitionNum(),
+        config.getCompressionFormat().getSuffix()
     ));
     FileSystem fs = tmpIndexFile.getFileSystem(hadoopConfig);
 
@@ -174,7 +211,7 @@ public class HdfsDataSegmentPusher implements 
DataSegmentPusher
     final DataSegment dataSegment;
     try {
       try (FSDataOutputStream out = fs.create(tmpIndexFile)) {
-        size = CompressionUtils.zip(inDir, out);
+        size = config.getCompressionFormat().compressDirectory(inDir, out);
       }
       final Path outIndexFile = new Path(outIndexFilePath);
       dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri()))
@@ -185,6 +222,10 @@ public class HdfsDataSegmentPusher implements 
DataSegmentPusher
       fs.mkdirs(outIndexFile.getParent());
       copyFilesWithChecks(fs, tmpIndexFile, outIndexFile);
     }
+    catch (IOException e) {
+      log.error(e, "Failed to push segment[%s] to HDFS at location[%s]", 
segment.getId(), outIndexFilePath);
+      throw e;
+    }
     finally {
       try {
         if (fs.exists(tmpIndexFile.getParent()) && 
!fs.delete(tmpIndexFile.getParent(), true)) {
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java
index 7692be13117..a2b656cc13d 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java
@@ -20,6 +20,7 @@
 package org.apache.druid.storage.hdfs;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.utils.CompressionUtils;
 
 /**
  */
@@ -28,6 +29,9 @@ public class HdfsDataSegmentPusherConfig
   @JsonProperty
   private String storageDirectory = "";
 
+  @JsonProperty
+  private CompressionUtils.Format compressionFormat = 
CompressionUtils.Format.ZIP;
+
   public void setStorageDirectory(String storageDirectory)
   {
     this.storageDirectory = storageDirectory;
@@ -37,4 +41,14 @@ public class HdfsDataSegmentPusherConfig
   {
     return storageDirectory;
   }
+
+  public void setCompressionFormat(CompressionUtils.Format compressionFormat)
+  {
+    this.compressionFormat = compressionFormat;
+  }
+
+  public CompressionUtils.Format getCompressionFormat()
+  {
+    return compressionFormat;
+  }
 }
diff --git 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java
 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java
index 7bdf8c06980..e4b455e4812 100644
--- 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java
+++ 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java
@@ -36,7 +36,6 @@ import org.apache.druid.indexer.Bucket;
 import org.apache.druid.indexer.HadoopDruidIndexerConfig;
 import org.apache.druid.indexer.HadoopIngestionSpec;
 import org.apache.druid.indexer.JobHelper;
-import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.jackson.GranularityModule;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
@@ -46,6 +45,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.utils.CompressionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -97,7 +97,7 @@ public class HdfsDataSegmentPusherTest
   {
     HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConf = new 
HdfsDataSegmentPusherConfig();
     hdfsDataSegmentPusherConf.setStorageDirectory("path/to/");
-    hdfsDataSegmentPusher = new 
HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true), 
objectMapper);
+    hdfsDataSegmentPusher = new 
HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true));
   }
 
   @Test
@@ -126,7 +126,8 @@ public class HdfsDataSegmentPusherTest
   @Test
   public void testPushWithMultipleSegments() throws Exception
   {
-    testUsingSchemeForMultipleSegments("file", 3);
+    testUsingSchemeForMultipleSegments("file", 3, CompressionUtils.Format.ZIP);
+    testUsingSchemeForMultipleSegments("file", 3, CompressionUtils.Format.LZ4);
   }
 
   @Test
@@ -146,7 +147,7 @@ public class HdfsDataSegmentPusherTest
     final File storageDirectory = tempFolder.newFolder();
 
     config.setStorageDirectory(StringUtils.format("file://%s", 
storageDirectory.getAbsolutePath()));
-    HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new 
DefaultObjectMapper());
+    HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf);
 
     DataSegment segmentToPush = new DataSegment(
         "foo",
@@ -187,7 +188,7 @@ public class HdfsDataSegmentPusherTest
     final File storageDirectory = tempFolder.newFolder();
 
     config.setStorageDirectory(StringUtils.format("file://%s", 
storageDirectory.getAbsolutePath()));
-    HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new 
DefaultObjectMapper());
+    HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf);
 
     DataSegment segmentToPush = new DataSegment(
         "foo",
@@ -210,7 +211,7 @@ public class HdfsDataSegmentPusherTest
     );
   }
 
-  private void testUsingSchemeForMultipleSegments(final String scheme, final 
int numberOfSegments) throws Exception
+  private void testUsingSchemeForMultipleSegments(final String scheme, final 
int numberOfSegments, CompressionUtils.Format format) throws Exception
   {
     Configuration conf = new Configuration(true);
     DataSegment[] segments = new DataSegment[numberOfSegments];
@@ -226,12 +227,14 @@ public class HdfsDataSegmentPusherTest
     HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
     final File storageDirectory = tempFolder.newFolder();
 
+    config.setCompressionFormat(format);
     config.setStorageDirectory(
         scheme != null
         ? StringUtils.format("%s://%s", scheme, 
storageDirectory.getAbsolutePath())
         : storageDirectory.getAbsolutePath()
     );
-    HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new 
DefaultObjectMapper());
+
+    HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf);
 
     for (int i = 0; i < numberOfSegments; i++) {
       segments[i] = new DataSegment(
@@ -251,10 +254,11 @@ public class HdfsDataSegmentPusherTest
       final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], 
false);
 
       String indexUri = StringUtils.format(
-          "%s/%s/%d_index.zip",
+          "%s/%s/%d_index.%s",
           FileSystem.newInstance(conf).makeQualified(new 
Path(config.getStorageDirectory())).toUri().toString(),
           pusher.getStorageDir(segments[i], false),
-          segments[i].getShardSpec().getPartitionNum()
+          segments[i].getShardSpec().getPartitionNum(),
+          format.getExtension()
       );
 
       Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize());
@@ -269,10 +273,11 @@ public class HdfsDataSegmentPusherTest
       String segmentPath = pusher.getStorageDir(pushedSegment, false);
 
       File indexFile = new File(StringUtils.format(
-          "%s/%s/%d_index.zip",
+          "%s/%s/%d_index.%s",
           storageDirectory,
           segmentPath,
-          pushedSegment.getShardSpec().getPartitionNum()
+          pushedSegment.getShardSpec().getPartitionNum(),
+          format.getExtension()
       ));
       Assert.assertTrue(indexFile.exists());
 
@@ -280,10 +285,11 @@ public class HdfsDataSegmentPusherTest
       Assert.assertEquals(segments[i], pushedSegment);
 
       indexFile = new File(StringUtils.format(
-          "%s/%s/%d_index.zip",
+          "%s/%s/%d_index.%s",
           storageDirectory,
           segmentPath,
-          pushedSegment.getShardSpec().getPartitionNum()
+          pushedSegment.getShardSpec().getPartitionNum(),
+          format.getExtension()
       ));
       Assert.assertTrue(indexFile.exists());
 
@@ -320,7 +326,7 @@ public class HdfsDataSegmentPusherTest
         ? StringUtils.format("%s://%s", scheme, 
storageDirectory.getAbsolutePath())
         : storageDirectory.getAbsolutePath()
     );
-    HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new 
DefaultObjectMapper());
+    HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf);
 
     DataSegment segmentToPush = new DataSegment(
         "foo",
diff --git 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
index 482c47acee2..a5111d2159c 100644
--- 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
+++ 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
@@ -28,6 +28,8 @@ import org.apache.druid.guice.JsonConfigurator;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.loading.OmniDataSegmentKiller;
 import org.junit.Assert;
 import org.junit.Test;
@@ -85,6 +87,7 @@ public class HdfsStorageDruidModuleTest
               
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
               binder.bind(JsonConfigurator.class).in(LazySingleton.class);
               binder.bind(Properties.class).toInstance(props);
+              binder.bind(ServiceEmitter.class).toInstance(new 
ServiceEmitter("test", "localhost", new NoopEmitter()));
             },
             new HdfsStorageDruidModule()
         )
diff --git 
a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java 
b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
index 01b45c5089a..eabec58ef15 100644
--- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
+++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.utils;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
@@ -27,6 +28,9 @@ import com.google.common.io.ByteSink;
 import com.google.common.io.ByteSource;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
+import net.jpountz.lz4.LZ4Factory;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import 
org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInputStream;
 import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
@@ -46,7 +50,10 @@ import org.apache.druid.java.util.common.logger.Logger;
 import javax.annotation.Nullable;
 import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -57,6 +64,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
@@ -74,9 +82,34 @@ public class CompressionUtils
   {
     BZ2(".bz2", "bz2"),
     GZ(".gz", "gz"),
+    LZ4(".lz4", "lz4") {
+      @Override
+      public long compressDirectory(File directory, OutputStream out) throws 
IOException
+      {
+        return lz4CompressDirectory(directory, out);
+      }
+
+      @Override
+      public FileUtils.FileCopyResult decompressDirectory(InputStream in, File 
outDir) throws IOException
+      {
+        return lz4DecompressDirectory(in, outDir);
+      }
+    },
     SNAPPY(".sz", "sz"),
     XZ(".xz", "xz"),
-    ZIP(".zip", "zip"),
+    ZIP(".zip", "zip") {
+      @Override
+      public long compressDirectory(File directory, OutputStream out) throws 
IOException
+      {
+        return zip(directory, out);
+      }
+
+      @Override
+      public FileUtils.FileCopyResult decompressDirectory(InputStream in, File 
outDir) throws IOException
+      {
+        return unzip(in, outDir);
+      }
+    },
     ZSTD(".zst", "zst");
 
     private static final Map<String, Format> EXTENSION_TO_COMPRESSION_FORMAT;
@@ -85,6 +118,7 @@ public class CompressionUtils
       ImmutableMap.Builder<String, Format> builder = ImmutableMap.builder();
       builder.put(BZ2.getExtension(), BZ2);
       builder.put(GZ.getExtension(), GZ);
+      builder.put(LZ4.getExtension(), LZ4);
       builder.put(SNAPPY.getExtension(), SNAPPY);
       builder.put(XZ.getExtension(), XZ);
       builder.put(ZIP.getExtension(), ZIP);
@@ -110,6 +144,16 @@ public class CompressionUtils
       return extension;
     }
 
+    @Nullable
+    @JsonCreator
+    public static Format fromString(@Nullable String name)
+    {
+      if (Strings.isNullOrEmpty(name)) {
+        return null;
+      }
+      return valueOf(name.toUpperCase(Locale.ROOT));
+    }
+
     @Nullable
     public static Format fromFileName(@Nullable String fileName)
     {
@@ -119,6 +163,36 @@ public class CompressionUtils
       }
       return EXTENSION_TO_COMPRESSION_FORMAT.get(extension);
     }
+
+    /**
+     * Compresses a directory to the output stream. Default implementation 
throws UnsupportedOperationException.
+     * Override this method for formats that support directory compression.
+     *
+     * @param directory The directory to compress
+     * @param out The output stream to write compressed data to
+     * @return The number of bytes (uncompressed) read from the input directory
+     * @throws IOException if an I/O error occurs
+     * @throws UnsupportedOperationException if the format doesn't support 
directory compression
+     */
+    public long compressDirectory(File directory, OutputStream out) throws 
IOException
+    {
+      throw new UnsupportedOperationException("Directory compression not 
supported for " + this.name());
+    }
+
+    /**
+     * Decompresses a directory from the input stream. Default implementation 
throws UnsupportedOperationException.
+     * Override this method for formats that support directory decompression.
+     *
+     * @param in The input stream containing compressed data
+     * @param outDir The output directory to extract files to
+     * @return A FileCopyResult containing information about extracted files
+     * @throws IOException if an I/O error occurs
+     * @throws UnsupportedOperationException if the format doesn't support 
directory decompression
+     */
+    public FileUtils.FileCopyResult decompressDirectory(InputStream in, File 
outDir) throws IOException
+    {
+      throw new UnsupportedOperationException("Directory decompression not 
supported for " + this.name());
+    }
   }
 
   public static final int COMPRESSED_TEXT_WEIGHT_FACTOR = 4;
@@ -212,6 +286,106 @@ public class CompressionUtils
     return totalSize;
   }
 
+  /**
+   * Compresses directory contents using LZ4 block compression with a simple 
archive format.
+   * Format: [file_count:4 
bytes][file1_name_length:4][file1_name:bytes][file1_size:8][file1_data:bytes]...
+   *
+   * @param directory The directory whose contents should be compressed
+   * @param out The output stream to write compressed data to
+   * @return The number of bytes (uncompressed) read from the input directory
+   */
+  public static long lz4CompressDirectory(File directory, OutputStream out) 
throws IOException
+  {
+    if (!directory.isDirectory()) {
+      throw new IOE("directory[%s] is not a directory", directory);
+    }
+
+    // Use fast compressor for better performance (lower CPU, faster 
compression)
+    final LZ4BlockOutputStream lz4Out = new LZ4BlockOutputStream(
+        out,
+        64 * 1024, // Block size
+        LZ4Factory.fastestInstance().fastCompressor()
+    );
+    // Use DataOutputStream for structured writing
+    final DataOutputStream dataOut = new DataOutputStream(lz4Out);
+    final File[] files = directory.listFiles();
+
+    if (files == null) {
+      throw new IOE("Cannot list files in directory[%s]", directory);
+    }
+
+    // Sort for consistency
+    final File[] sortedFiles = 
Arrays.stream(files).sorted().toArray(File[]::new);
+
+    dataOut.writeInt(sortedFiles.length);
+
+    long totalSize = 0;
+
+    for (File file : sortedFiles) {
+      if (file.isDirectory()) {
+        continue; // Skip subdirectories like ZIP does
+      }
+
+      log.debug("Compressing file[%s] with size[%,d]. Total size so far[%,d]", 
file, file.length(), totalSize);
+
+      final String fileName = file.getName();
+      final byte[] fileNameBytes = fileName.getBytes(StandardCharsets.UTF_8);
+      dataOut.writeInt(fileNameBytes.length);
+      dataOut.write(fileNameBytes);
+
+      final long fileSize = file.length();
+      if (fileSize > Integer.MAX_VALUE) {
+        throw new IOE("file[%s] too large [%,d]", file, fileSize);
+      }
+
+      dataOut.writeLong(fileSize);
+      totalSize += fileSize;
+
+      // Copy file content to dataOut
+      try (FileInputStream fileIn = new FileInputStream(file)) {
+        ByteStreams.copy(fileIn, dataOut);
+      }
+    }
+
+    dataOut.flush();
+    lz4Out.finish();
+    return totalSize;
+  }
+
+  /**
+   * Decompresses LZ4-compressed directory archive
+   */
+  public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream 
in, File outDir) throws IOException
+  {
+    if (!(outDir.exists() && outDir.isDirectory())) {
+      throw new ISE("outDir[%s] must exist and be a directory", outDir);
+    }
+
+    final LZ4BlockInputStream lz4In = new LZ4BlockInputStream(in);
+    final DataInputStream dataIn = new DataInputStream(lz4In);
+
+    final int fileCount = dataIn.readInt();
+    final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
+
+    for (int i = 0; i < fileCount; i++) {
+      final int fileNameLength = dataIn.readInt();
+      final byte[] fileNameBytes = new byte[fileNameLength];
+      dataIn.readFully(fileNameBytes);
+      final String fileName = new String(fileNameBytes, 
StandardCharsets.UTF_8);
+
+      final long fileSize = dataIn.readLong();
+
+      // Write to file
+      final File outFile = new File(outDir, fileName);
+      validateZipOutputFile("", outFile, outDir);
+
+      NativeIO.chunkedCopy(ByteStreams.limit(dataIn, fileSize), outFile);
+      result.addFile(outFile);
+    }
+
+    return result;
+  }
+
   /**
    * Unzip the byteSource to the output directory. If cacheLocally is true, 
the byteSource is cached to local disk before unzipping.
    * This may cause more predictable behavior than trying to unzip a large 
file directly off a network stream, for example.
@@ -622,6 +796,8 @@ public class CompressionUtils
   {
     if (fileName.endsWith(Format.GZ.getSuffix())) {
       return gzipInputStream(in);
+    } else if (fileName.endsWith(Format.LZ4.getSuffix())) {
+      return new LZ4BlockInputStream(in);
     } else if (fileName.endsWith(Format.BZ2.getSuffix())) {
       return new BZip2CompressorInputStream(in, true);
     } else if (fileName.endsWith(Format.XZ.getSuffix())) {
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java
index 27377a53a77..6d1c41e4ee5 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java
@@ -26,6 +26,7 @@ import com.google.common.io.ByteSource;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.CountingInputStream;
 import com.google.common.io.Files;
+import net.jpountz.lz4.LZ4BlockOutputStream;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import 
org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorOutputStream;
 import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
@@ -407,6 +408,46 @@ public class CompressionUtilsTest
     }
   }
 
+  @Test
+  public void testGoodLz4CompressUncompressDirectory() throws IOException
+  {
+    final File tmpDir = 
temporaryFolder.newFolder("testGoodLz4CompressUncompressDirectory");
+    final File lz4File = new File(tmpDir, "compressionUtilTest.lz4");
+
+    try (final OutputStream out = new FileOutputStream(lz4File)) {
+      CompressionUtils.Format.LZ4.compressDirectory(testDir, out);
+    }
+
+    final File newDir = new File(tmpDir, "newDir");
+    Assert.assertTrue(newDir.mkdir());
+
+    final FileUtils.FileCopyResult result;
+    try (final InputStream in = new FileInputStream(lz4File)) {
+      result = CompressionUtils.Format.LZ4.decompressDirectory(in, newDir);
+    }
+
+    verifyUnzip(newDir, result, ImmutableMap.of(testFile.getName(), 
StringUtils.toUtf8(CONTENT)));
+  }
+
+  @Test
+  public void testDecompressLz4() throws IOException
+  {
+    final File tmpDir = temporaryFolder.newFolder("testDecompressLz4");
+    final File lz4File = new File(tmpDir, testFile.getName() + ".lz4");
+    Assert.assertFalse(lz4File.exists());
+
+    try (
+        final OutputStream out = new LZ4BlockOutputStream(new 
FileOutputStream(lz4File));
+        final InputStream in = new FileInputStream(testFile)
+    ) {
+      ByteStreams.copy(in, out);
+    }
+
+    try (final InputStream inputStream = CompressionUtils.decompress(new 
FileInputStream(lz4File), lz4File.getName())) {
+      assertGoodDataStream(inputStream);
+    }
+  }
+
   @Test
   public void testGoodGZStream() throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to