This is an automated email from the ASF dual-hosted git repository.
frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 41175f3ecd1 Support lz4 compression in hdfs (#18982)
41175f3ecd1 is described below
commit 41175f3ecd1d00aeaa76135dcba59705e6dd3cc1
Author: Frank Chen <[email protected]>
AuthorDate: Fri Feb 20 20:55:35 2026 +0800
Support lz4 compression in hdfs (#18982)
* Support lz4 compression for segment
* Add pull metrics
* Add tests
* Add test
Signed-off-by: Frank Chen <[email protected]>
* Clean
* Clean
* Add test
* Update doc
* Fix test
* Remove unused
* Add doc for metrics
---------
Signed-off-by: Frank Chen <[email protected]>
---
docs/configuration/index.md | 1 +
docs/operations/metrics.md | 11 ++
.../druid/storage/hdfs/HdfsDataSegmentPuller.java | 51 +++---
.../druid/storage/hdfs/HdfsDataSegmentPusher.java | 83 +++++++---
.../storage/hdfs/HdfsDataSegmentPusherConfig.java | 14 ++
.../storage/hdfs/HdfsDataSegmentPusherTest.java | 34 ++--
.../storage/hdfs/HdfsStorageDruidModuleTest.java | 3 +
.../org/apache/druid/utils/CompressionUtils.java | 178 ++++++++++++++++++++-
.../java/util/common/CompressionUtilsTest.java | 41 +++++
9 files changed, 361 insertions(+), 55 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f6ddebcb0b2..a57d09ea9b3 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -441,6 +441,7 @@ This deep storage is used to interface with HDFS. You must
load the `druid-hdfs-
|Property|Description|Default|
|--------|-----------|-------|
|`druid.storage.storageDirectory`|HDFS directory to use as deep storage.|none|
+|`druid.storage.compressionFormat`|The compression format applied to the
segments uploaded to HDFS. Only `zip` and `lz4` are supported. |zip|
#### Cassandra deep storage
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 690738cbc9e..523ea15185b 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -630,3 +630,14 @@ These metrics are available on operating systems with the
cgroup kernel feature.
|`cgroup/cpuset/effective_cpu_count`|Total number of active CPUs available to
the process. Derived from `cpuset.effective_cpus`.||Varies|
|`cgroup/cpuset/mems_count`|Total number of memory nodes available to the
process. Derived from `cpuset.mems`.||Varies|
|`cgroup/cpuset/effective_mems_count`|Total number of active memory nodes
available to the process. Derived from `cpuset.effective_mems`.||Varies|
+
+## HDFS
+
+These metrics are available when the `druid-hdfs-storage` extension is used to
push/pull segment files.
+
+| Metric | Description
| Dimensions | Normal
value |
+|----------------------|-------------------------------------------------------------------------------|---------------------------------------------|--------------|
+| `hdfs/pull/size` | Total bytes of decompressed segment files pulled from
HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies
|
+| `hdfs/pull/duration` | Time in milliseconds spent decompressing and pulling
segment files from HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies
|
+| `hdfs/push/size` | Total bytes compressed segment files pushed to HDFS.
| `format`: compression format (`ZIP`, `LZ4`) | Varies
|
+| `hdfs/push/duration` | Time in milliseconds spent compressing and pushing
segment files to HDFS. | `format`: compression format (`ZIP`, `LZ4`) |
Varies |
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java
index 2c6de8e28bb..34a358bdd90 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java
@@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.io.NativeIO;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.URIDataPuller;
import org.apache.druid.utils.CompressionUtils;
@@ -40,6 +42,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import javax.annotation.Nullable;
import javax.tools.FileObject;
import java.io.File;
import java.io.IOException;
@@ -177,11 +180,21 @@ public class HdfsDataSegmentPuller implements
URIDataPuller
private static final Logger log = new Logger(HdfsDataSegmentPuller.class);
protected final Configuration config;
+ private final ServiceEmitter emitter;
- @Inject
public HdfsDataSegmentPuller(@Hdfs final Configuration config)
+ {
+ this(config, null);
+ }
+
+ @Inject
+ public HdfsDataSegmentPuller(
+ @Hdfs final Configuration config,
+ @Nullable final ServiceEmitter emitter
+ )
{
this.config = config;
+ this.emitter = emitter;
}
FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir)
throws SegmentLoadingException
@@ -235,27 +248,16 @@ public class HdfsDataSegmentPuller implements
URIDataPuller
catch (Exception e) {
throw new RuntimeException(e);
}
- } else if (CompressionUtils.isZip(path.getName())) {
+ }
- // -------- zip ---------
+ // Try to detect format from file extension and decompress
+ final CompressionUtils.Format format =
CompressionUtils.Format.fromFileName(path.getName());
+ if ((format == CompressionUtils.Format.ZIP || format ==
CompressionUtils.Format.LZ4)) {
+ long startTime = System.currentTimeMillis();
- final FileUtils.FileCopyResult result = CompressionUtils.unzip(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- return getInputStream(path);
- }
- }, outDir, shouldRetryPredicate(), false
- );
+ final FileUtils.FileCopyResult result =
format.decompressDirectory(getInputStream(path), outDir);
- log.info(
- "Unzipped %d bytes from [%s] to [%s]",
- result.size(),
- path.toString(),
- outDir.getAbsolutePath()
- );
+ emitMetrics(format, result.size(), System.currentTimeMillis() -
startTime);
return result;
} else if (CompressionUtils.isGz(path.getName())) {
@@ -292,6 +294,17 @@ public class HdfsDataSegmentPuller implements URIDataPuller
}
}
+ private void emitMetrics(CompressionUtils.Format format, long size, long
duration)
+ {
+ if (emitter == null) {
+ return;
+ }
+ ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
+ metricBuilder.setDimension("format", format);
+ emitter.emit(metricBuilder.setMetric("hdfs/pull/size", size));
+ emitter.emit(metricBuilder.setMetric("hdfs/pull/duration", duration));
+ }
+
public InputStream getInputStream(Path path) throws IOException
{
return buildFileObject(path.toUri(), config).openInputStream();
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
index 6bcf96d6819..bcbff961e52 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
@@ -19,7 +19,6 @@
package org.apache.druid.storage.hdfs;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@@ -32,10 +31,11 @@ import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.HadoopFsWrapper;
import org.apache.hadoop.fs.Path;
import org.joda.time.format.ISODateTimeFormat;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -55,20 +56,33 @@ public class HdfsDataSegmentPusher implements
DataSegmentPusher
{
private static final Logger log = new Logger(HdfsDataSegmentPusher.class);
+ private final HdfsDataSegmentPusherConfig config;
private final Configuration hadoopConfig;
+ @Nullable
+ private final ServiceEmitter emitter;
// We lazily initialize fullQualifiedStorageDirectory to avoid potential
issues with Hadoop namenode HA.
// Please see https://github.com/apache/druid/pull/5684
private final Supplier<String> fullyQualifiedStorageDirectory;
+ public HdfsDataSegmentPusher(
+ HdfsDataSegmentPusherConfig config,
+ @Hdfs Configuration hadoopConfig
+ )
+ {
+ this(config, hadoopConfig, null);
+ }
+
@Inject
public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
@Hdfs Configuration hadoopConfig,
- ObjectMapper jsonMapper
+ ServiceEmitter emitter
)
{
+ this.config = config;
this.hadoopConfig = hadoopConfig;
+ this.emitter = emitter;
Path storageDir = new Path(config.getStorageDirectory());
this.fullyQualifiedStorageDirectory = Suppliers.memoize(
() -> {
@@ -105,28 +119,34 @@ public class HdfsDataSegmentPusher implements
DataSegmentPusher
// '{partitionNum}_index.zip' without unique paths and
'{partitionNum}_{UUID}_index.zip' with unique paths.
final String storageDir = this.getStorageDir(segment, false);
-
final String uniquePrefix = useUniquePath ?
DataSegmentPusher.generateUniquePath() + "_" : "";
- final String outIndexFilePathSuffix = StringUtils.format(
- "%s/%s/%d_%sindex.zip",
+ final String outIndexFilePath = StringUtils.format(
+ "%s/%s/%d_%sindex%s",
fullyQualifiedStorageDirectory.get(),
storageDir,
segment.getShardSpec().getPartitionNum(),
- uniquePrefix
+ uniquePrefix,
+ config.getCompressionFormat().getSuffix()
);
- return pushToFilePathWithRetry(inDir, segment, outIndexFilePathSuffix);
+ return pushToFilePathWithRetry(inDir, segment, outIndexFilePath);
}
@Override
public DataSegment pushToPath(File inDir, DataSegment segment, String
storageDirSuffix) throws IOException
{
- String outIndexFilePath = StringUtils.format(
- "%s/%s/%d_index.zip",
- fullyQualifiedStorageDirectory.get(),
- storageDirSuffix.replace(':', '_'),
- segment.getShardSpec().getPartitionNum()
- );
+ final String outIndexFilePath;
+ if (storageDirSuffix.endsWith("index.zip") ||
storageDirSuffix.endsWith("index.lz4")) {
+ outIndexFilePath = StringUtils.format("%s/%s",
fullyQualifiedStorageDirectory.get(), storageDirSuffix);
+ } else {
+ outIndexFilePath = StringUtils.format(
+ "%s/%s/%d_index%s",
+ fullyQualifiedStorageDirectory.get(),
+ storageDirSuffix.replace(':', '_'),
+ segment.getShardSpec().getPartitionNum(),
+ config.getCompressionFormat().getSuffix()
+ );
+ }
return pushToFilePathWithRetry(inDir, segment, outIndexFilePath);
}
@@ -136,11 +156,17 @@ public class HdfsDataSegmentPusher implements
DataSegmentPusher
{
// Retry any HDFS errors that occur, up to 5 times.
try {
- return RetryUtils.retry(
+ final long startTime = System.currentTimeMillis();
+
+ final DataSegment result = RetryUtils.retry(
() -> pushToFilePath(inDir, segment, outIndexFilePath),
exception -> exception instanceof Exception,
5
);
+
+ emitMetrics(result.getSize(), System.currentTimeMillis() - startTime);
+
+ return result;
}
catch (Exception e) {
Throwables.throwIfInstanceOf(e, IOException.class);
@@ -149,21 +175,32 @@ public class HdfsDataSegmentPusher implements
DataSegmentPusher
}
}
+ private void emitMetrics(long size, long duration)
+ {
+ if (emitter == null) {
+ return;
+ }
+
+ final ServiceMetricEvent.Builder metricBuilder =
ServiceMetricEvent.builder();
+ metricBuilder.setDimension("format", config.getCompressionFormat());
+ emitter.emit(metricBuilder.setMetric("hdfs/push/size", size));
+ emitter.emit(metricBuilder.setMetric("hdfs/push/duration", duration));
+ }
+
private DataSegment pushToFilePath(File inDir, DataSegment segment, String
outIndexFilePath) throws IOException
{
log.debug(
- "Copying segment[%s] to HDFS at location[%s/%s]",
+ "Copying segment[%s] to HDFS at location[%s]",
segment.getId(),
- fullyQualifiedStorageDirectory.get(),
outIndexFilePath
);
-
Path tmpIndexFile = new Path(StringUtils.format(
- "%s/%s/%s/%s_index.zip",
+ "%s/%s/%s/%s_index%s",
fullyQualifiedStorageDirectory.get(),
segment.getDataSource(),
UUIDUtils.generateUuid(),
- segment.getShardSpec().getPartitionNum()
+ segment.getShardSpec().getPartitionNum(),
+ config.getCompressionFormat().getSuffix()
));
FileSystem fs = tmpIndexFile.getFileSystem(hadoopConfig);
@@ -174,7 +211,7 @@ public class HdfsDataSegmentPusher implements
DataSegmentPusher
final DataSegment dataSegment;
try {
try (FSDataOutputStream out = fs.create(tmpIndexFile)) {
- size = CompressionUtils.zip(inDir, out);
+ size = config.getCompressionFormat().compressDirectory(inDir, out);
}
final Path outIndexFile = new Path(outIndexFilePath);
dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri()))
@@ -185,6 +222,10 @@ public class HdfsDataSegmentPusher implements
DataSegmentPusher
fs.mkdirs(outIndexFile.getParent());
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile);
}
+ catch (IOException e) {
+ log.error(e, "Failed to push segment[%s] to HDFS at location[%s]",
segment.getId(), outIndexFilePath);
+ throw e;
+ }
finally {
try {
if (fs.exists(tmpIndexFile.getParent()) &&
!fs.delete(tmpIndexFile.getParent(), true)) {
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java
index 7692be13117..a2b656cc13d 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java
@@ -20,6 +20,7 @@
package org.apache.druid.storage.hdfs;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.utils.CompressionUtils;
/**
*/
@@ -28,6 +29,9 @@ public class HdfsDataSegmentPusherConfig
@JsonProperty
private String storageDirectory = "";
+ @JsonProperty
+ private CompressionUtils.Format compressionFormat =
CompressionUtils.Format.ZIP;
+
public void setStorageDirectory(String storageDirectory)
{
this.storageDirectory = storageDirectory;
@@ -37,4 +41,14 @@ public class HdfsDataSegmentPusherConfig
{
return storageDirectory;
}
+
+ public void setCompressionFormat(CompressionUtils.Format compressionFormat)
+ {
+ this.compressionFormat = compressionFormat;
+ }
+
+ public CompressionUtils.Format getCompressionFormat()
+ {
+ return compressionFormat;
+ }
}
diff --git
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java
index 7bdf8c06980..e4b455e4812 100644
---
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java
@@ -36,7 +36,6 @@ import org.apache.druid.indexer.Bucket;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.HadoopIngestionSpec;
import org.apache.druid.indexer.JobHelper;
-import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.jackson.GranularityModule;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@@ -46,6 +45,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -97,7 +97,7 @@ public class HdfsDataSegmentPusherTest
{
HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConf = new
HdfsDataSegmentPusherConfig();
hdfsDataSegmentPusherConf.setStorageDirectory("path/to/");
- hdfsDataSegmentPusher = new
HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true),
objectMapper);
+ hdfsDataSegmentPusher = new
HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true));
}
@Test
@@ -126,7 +126,8 @@ public class HdfsDataSegmentPusherTest
@Test
public void testPushWithMultipleSegments() throws Exception
{
- testUsingSchemeForMultipleSegments("file", 3);
+ testUsingSchemeForMultipleSegments("file", 3, CompressionUtils.Format.ZIP);
+ testUsingSchemeForMultipleSegments("file", 3, CompressionUtils.Format.LZ4);
}
@Test
@@ -146,7 +147,7 @@ public class HdfsDataSegmentPusherTest
final File storageDirectory = tempFolder.newFolder();
config.setStorageDirectory(StringUtils.format("file://%s",
storageDirectory.getAbsolutePath()));
- HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new
DefaultObjectMapper());
+ HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf);
DataSegment segmentToPush = new DataSegment(
"foo",
@@ -187,7 +188,7 @@ public class HdfsDataSegmentPusherTest
final File storageDirectory = tempFolder.newFolder();
config.setStorageDirectory(StringUtils.format("file://%s",
storageDirectory.getAbsolutePath()));
- HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new
DefaultObjectMapper());
+ HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf);
DataSegment segmentToPush = new DataSegment(
"foo",
@@ -210,7 +211,7 @@ public class HdfsDataSegmentPusherTest
);
}
- private void testUsingSchemeForMultipleSegments(final String scheme, final
int numberOfSegments) throws Exception
+ private void testUsingSchemeForMultipleSegments(final String scheme, final
int numberOfSegments, CompressionUtils.Format format) throws Exception
{
Configuration conf = new Configuration(true);
DataSegment[] segments = new DataSegment[numberOfSegments];
@@ -226,12 +227,14 @@ public class HdfsDataSegmentPusherTest
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
final File storageDirectory = tempFolder.newFolder();
+ config.setCompressionFormat(format);
config.setStorageDirectory(
scheme != null
? StringUtils.format("%s://%s", scheme,
storageDirectory.getAbsolutePath())
: storageDirectory.getAbsolutePath()
);
- HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new
DefaultObjectMapper());
+
+ HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf);
for (int i = 0; i < numberOfSegments; i++) {
segments[i] = new DataSegment(
@@ -251,10 +254,11 @@ public class HdfsDataSegmentPusherTest
final DataSegment pushedSegment = pusher.push(segmentDir, segments[i],
false);
String indexUri = StringUtils.format(
- "%s/%s/%d_index.zip",
+ "%s/%s/%d_index.%s",
FileSystem.newInstance(conf).makeQualified(new
Path(config.getStorageDirectory())).toUri().toString(),
pusher.getStorageDir(segments[i], false),
- segments[i].getShardSpec().getPartitionNum()
+ segments[i].getShardSpec().getPartitionNum(),
+ format.getExtension()
);
Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize());
@@ -269,10 +273,11 @@ public class HdfsDataSegmentPusherTest
String segmentPath = pusher.getStorageDir(pushedSegment, false);
File indexFile = new File(StringUtils.format(
- "%s/%s/%d_index.zip",
+ "%s/%s/%d_index.%s",
storageDirectory,
segmentPath,
- pushedSegment.getShardSpec().getPartitionNum()
+ pushedSegment.getShardSpec().getPartitionNum(),
+ format.getExtension()
));
Assert.assertTrue(indexFile.exists());
@@ -280,10 +285,11 @@ public class HdfsDataSegmentPusherTest
Assert.assertEquals(segments[i], pushedSegment);
indexFile = new File(StringUtils.format(
- "%s/%s/%d_index.zip",
+ "%s/%s/%d_index.%s",
storageDirectory,
segmentPath,
- pushedSegment.getShardSpec().getPartitionNum()
+ pushedSegment.getShardSpec().getPartitionNum(),
+ format.getExtension()
));
Assert.assertTrue(indexFile.exists());
@@ -320,7 +326,7 @@ public class HdfsDataSegmentPusherTest
? StringUtils.format("%s://%s", scheme,
storageDirectory.getAbsolutePath())
: storageDirectory.getAbsolutePath()
);
- HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new
DefaultObjectMapper());
+ HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf);
DataSegment segmentToPush = new DataSegment(
"foo",
diff --git
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
index 482c47acee2..a5111d2159c 100644
---
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
@@ -28,6 +28,8 @@ import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.junit.Assert;
import org.junit.Test;
@@ -85,6 +87,7 @@ public class HdfsStorageDruidModuleTest
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
binder.bind(Properties.class).toInstance(props);
+ binder.bind(ServiceEmitter.class).toInstance(new
ServiceEmitter("test", "localhost", new NoopEmitter()));
},
new HdfsStorageDruidModule()
)
diff --git
a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
index 01b45c5089a..eabec58ef15 100644
--- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
+++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
@@ -19,6 +19,7 @@
package org.apache.druid.utils;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
@@ -27,6 +28,9 @@ import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
+import net.jpountz.lz4.LZ4Factory;
import
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import
org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
@@ -46,7 +50,10 @@ import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -57,6 +64,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Enumeration;
+import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
@@ -74,9 +82,34 @@ public class CompressionUtils
{
BZ2(".bz2", "bz2"),
GZ(".gz", "gz"),
+ LZ4(".lz4", "lz4") {
+ @Override
+ public long compressDirectory(File directory, OutputStream out) throws
IOException
+ {
+ return lz4CompressDirectory(directory, out);
+ }
+
+ @Override
+ public FileUtils.FileCopyResult decompressDirectory(InputStream in, File
outDir) throws IOException
+ {
+ return lz4DecompressDirectory(in, outDir);
+ }
+ },
SNAPPY(".sz", "sz"),
XZ(".xz", "xz"),
- ZIP(".zip", "zip"),
+ ZIP(".zip", "zip") {
+ @Override
+ public long compressDirectory(File directory, OutputStream out) throws
IOException
+ {
+ return zip(directory, out);
+ }
+
+ @Override
+ public FileUtils.FileCopyResult decompressDirectory(InputStream in, File
outDir) throws IOException
+ {
+ return unzip(in, outDir);
+ }
+ },
ZSTD(".zst", "zst");
private static final Map<String, Format> EXTENSION_TO_COMPRESSION_FORMAT;
@@ -85,6 +118,7 @@ public class CompressionUtils
ImmutableMap.Builder<String, Format> builder = ImmutableMap.builder();
builder.put(BZ2.getExtension(), BZ2);
builder.put(GZ.getExtension(), GZ);
+ builder.put(LZ4.getExtension(), LZ4);
builder.put(SNAPPY.getExtension(), SNAPPY);
builder.put(XZ.getExtension(), XZ);
builder.put(ZIP.getExtension(), ZIP);
@@ -110,6 +144,16 @@ public class CompressionUtils
return extension;
}
+ @Nullable
+ @JsonCreator
+ public static Format fromString(@Nullable String name)
+ {
+ if (Strings.isNullOrEmpty(name)) {
+ return null;
+ }
+ return valueOf(name.toUpperCase(Locale.ROOT));
+ }
+
@Nullable
public static Format fromFileName(@Nullable String fileName)
{
@@ -119,6 +163,36 @@ public class CompressionUtils
}
return EXTENSION_TO_COMPRESSION_FORMAT.get(extension);
}
+
+ /**
+ * Compresses a directory to the output stream. Default implementation
throws UnsupportedOperationException.
+ * Override this method for formats that support directory compression.
+ *
+ * @param directory The directory to compress
+ * @param out The output stream to write compressed data to
+ * @return The number of bytes (uncompressed) read from the input directory
+ * @throws IOException if an I/O error occurs
+ * @throws UnsupportedOperationException if the format doesn't support
directory compression
+ */
+ public long compressDirectory(File directory, OutputStream out) throws
IOException
+ {
+ throw new UnsupportedOperationException("Directory compression not
supported for " + this.name());
+ }
+
+ /**
+ * Decompresses a directory from the input stream. Default implementation
throws UnsupportedOperationException.
+ * Override this method for formats that support directory decompression.
+ *
+ * @param in The input stream containing compressed data
+ * @param outDir The output directory to extract files to
+ * @return A FileCopyResult containing information about extracted files
+ * @throws IOException if an I/O error occurs
+ * @throws UnsupportedOperationException if the format doesn't support
directory decompression
+ */
+ public FileUtils.FileCopyResult decompressDirectory(InputStream in, File
outDir) throws IOException
+ {
+ throw new UnsupportedOperationException("Directory decompression not
supported for " + this.name());
+ }
}
public static final int COMPRESSED_TEXT_WEIGHT_FACTOR = 4;
@@ -212,6 +286,106 @@ public class CompressionUtils
return totalSize;
}
+ /**
+ * Compresses directory contents using LZ4 block compression with a simple
archive format.
+ * Format: [file_count:4
bytes][file1_name_length:4][file1_name:bytes][file1_size:8][file1_data:bytes]...
+ *
+ * @param directory The directory whose contents should be compressed
+ * @param out The output stream to write compressed data to
+ * @return The number of bytes (uncompressed) read from the input directory
+ */
+ public static long lz4CompressDirectory(File directory, OutputStream out)
throws IOException
+ {
+ if (!directory.isDirectory()) {
+ throw new IOE("directory[%s] is not a directory", directory);
+ }
+
+ // Use fast compressor for better performance (lower CPU, faster
compression)
+ final LZ4BlockOutputStream lz4Out = new LZ4BlockOutputStream(
+ out,
+ 64 * 1024, // Block size
+ LZ4Factory.fastestInstance().fastCompressor()
+ );
+ // Use DataOutputStream for structured writing
+ final DataOutputStream dataOut = new DataOutputStream(lz4Out);
+ final File[] files = directory.listFiles();
+
+ if (files == null) {
+ throw new IOE("Cannot list files in directory[%s]", directory);
+ }
+
+ // Sort for consistency
+ final File[] sortedFiles =
Arrays.stream(files).sorted().toArray(File[]::new);
+
+ dataOut.writeInt(sortedFiles.length);
+
+ long totalSize = 0;
+
+ for (File file : sortedFiles) {
+ if (file.isDirectory()) {
+ continue; // Skip subdirectories like ZIP does
+ }
+
+ log.debug("Compressing file[%s] with size[%,d]. Total size so far[%,d]",
file, file.length(), totalSize);
+
+ final String fileName = file.getName();
+ final byte[] fileNameBytes = fileName.getBytes(StandardCharsets.UTF_8);
+ dataOut.writeInt(fileNameBytes.length);
+ dataOut.write(fileNameBytes);
+
+ final long fileSize = file.length();
+ if (fileSize > Integer.MAX_VALUE) {
+ throw new IOE("file[%s] too large [%,d]", file, fileSize);
+ }
+
+ dataOut.writeLong(fileSize);
+ totalSize += fileSize;
+
+ // Copy file content to dataOut
+ try (FileInputStream fileIn = new FileInputStream(file)) {
+ ByteStreams.copy(fileIn, dataOut);
+ }
+ }
+
+ dataOut.flush();
+ lz4Out.finish();
+ return totalSize;
+ }
+
+ /**
+ * Decompresses LZ4-compressed directory archive
+ */
+ public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream
in, File outDir) throws IOException
+ {
+ if (!(outDir.exists() && outDir.isDirectory())) {
+ throw new ISE("outDir[%s] must exist and be a directory", outDir);
+ }
+
+ final LZ4BlockInputStream lz4In = new LZ4BlockInputStream(in);
+ final DataInputStream dataIn = new DataInputStream(lz4In);
+
+ final int fileCount = dataIn.readInt();
+ final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
+
+ for (int i = 0; i < fileCount; i++) {
+ final int fileNameLength = dataIn.readInt();
+ final byte[] fileNameBytes = new byte[fileNameLength];
+ dataIn.readFully(fileNameBytes);
+ final String fileName = new String(fileNameBytes,
StandardCharsets.UTF_8);
+
+ final long fileSize = dataIn.readLong();
+
+ // Write to file
+ final File outFile = new File(outDir, fileName);
+ validateZipOutputFile("", outFile, outDir);
+
+ NativeIO.chunkedCopy(ByteStreams.limit(dataIn, fileSize), outFile);
+ result.addFile(outFile);
+ }
+
+ return result;
+ }
+
/**
* Unzip the byteSource to the output directory. If cacheLocally is true,
the byteSource is cached to local disk before unzipping.
* This may cause more predictable behavior than trying to unzip a large
file directly off a network stream, for example.
@@ -622,6 +796,8 @@ public class CompressionUtils
{
if (fileName.endsWith(Format.GZ.getSuffix())) {
return gzipInputStream(in);
+ } else if (fileName.endsWith(Format.LZ4.getSuffix())) {
+ return new LZ4BlockInputStream(in);
} else if (fileName.endsWith(Format.BZ2.getSuffix())) {
return new BZip2CompressorInputStream(in, true);
} else if (fileName.endsWith(Format.XZ.getSuffix())) {
diff --git
a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java
b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java
index 27377a53a77..6d1c41e4ee5 100644
---
a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java
@@ -26,6 +26,7 @@ import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingInputStream;
import com.google.common.io.Files;
+import net.jpountz.lz4.LZ4BlockOutputStream;
import
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import
org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorOutputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
@@ -407,6 +408,46 @@ public class CompressionUtilsTest
}
}
+ @Test
+ public void testGoodLz4CompressUncompressDirectory() throws IOException
+ {
+ final File tmpDir =
temporaryFolder.newFolder("testGoodLz4CompressUncompressDirectory");
+ final File lz4File = new File(tmpDir, "compressionUtilTest.lz4");
+
+ try (final OutputStream out = new FileOutputStream(lz4File)) {
+ CompressionUtils.Format.LZ4.compressDirectory(testDir, out);
+ }
+
+ final File newDir = new File(tmpDir, "newDir");
+ Assert.assertTrue(newDir.mkdir());
+
+ final FileUtils.FileCopyResult result;
+ try (final InputStream in = new FileInputStream(lz4File)) {
+ result = CompressionUtils.Format.LZ4.decompressDirectory(in, newDir);
+ }
+
+ verifyUnzip(newDir, result, ImmutableMap.of(testFile.getName(),
StringUtils.toUtf8(CONTENT)));
+ }
+
+ @Test
+ public void testDecompressLz4() throws IOException
+ {
+ final File tmpDir = temporaryFolder.newFolder("testDecompressLz4");
+ final File lz4File = new File(tmpDir, testFile.getName() + ".lz4");
+ Assert.assertFalse(lz4File.exists());
+
+ try (
+ final OutputStream out = new LZ4BlockOutputStream(new
FileOutputStream(lz4File));
+ final InputStream in = new FileInputStream(testFile)
+ ) {
+ ByteStreams.copy(in, out);
+ }
+
+ try (final InputStream inputStream = CompressionUtils.decompress(new
FileInputStream(lz4File), lz4File.getName())) {
+ assertGoodDataStream(inputStream);
+ }
+ }
+
@Test
public void testGoodGZStream() throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]