This is an automated email from the ASF dual-hosted git repository.
dpitkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 7e50097ba4 GH-43469: [Java] Change the default
CompressionCodec.Factory to leverage compression support transparently (#43471)
7e50097ba4 is described below
commit 7e50097ba4239cf9368b77f438d877d4176141c9
Author: Costi Ciudatu <[email protected]>
AuthorDate: Tue Jul 30 22:30:57 2024 +0300
GH-43469: [Java] Change the default CompressionCodec.Factory to leverage
compression support transparently (#43471)
### Rationale for this change
Add compression support to Flight RPC and others by just including the
`arrow-compression` jar in the module path (or classpath).
### What changes are included in this PR?
Change the default compression factory to the new
`CompressionCodec.Factory.INSTANCE`, a ServiceLoader-backed singleton that
delegates to the best suited available implementation in the module/class path
for each codec type.
### Are these changes tested?
yes
### Are there any user-facing changes?
No.
* GitHub Issue: #43469
Authored-by: Costi Ciudatu <[email protected]>
Signed-off-by: Dane Pitkin <[email protected]>
---
java/compression/src/main/java/module-info.java | 6 +++
...row.vector.compression.CompressionCodec$Factory | 15 +++++++
.../TestCompressionCodecServiceProvider.java | 50 ++++++++++++++++++++++
java/vector/src/main/java/module-info.java | 2 +
.../java/org/apache/arrow/vector/VectorLoader.java | 2 +-
.../arrow/vector/compression/CompressionCodec.java | 44 +++++++++++++++++++
.../apache/arrow/vector/ipc/ArrowFileReader.java | 3 +-
.../org/apache/arrow/vector/ipc/ArrowReader.java | 3 +-
.../apache/arrow/vector/ipc/ArrowStreamReader.java | 3 +-
9 files changed, 121 insertions(+), 7 deletions(-)
diff --git a/java/compression/src/main/java/module-info.java
b/java/compression/src/main/java/module-info.java
index 6bf989e4c1..113a1dba9d 100644
--- a/java/compression/src/main/java/module-info.java
+++ b/java/compression/src/main/java/module-info.java
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+import org.apache.arrow.vector.compression.CompressionCodec;
+
module org.apache.arrow.compression {
exports org.apache.arrow.compression;
@@ -22,4 +24,8 @@ module org.apache.arrow.compression {
requires org.apache.arrow.memory.core;
requires org.apache.arrow.vector;
requires org.apache.commons.compress;
+
+ // Also defined under META-INF/services to support non-modular applications
+ provides CompressionCodec.Factory with
+ org.apache.arrow.compression.CommonsCompressionFactory;
}
diff --git
a/java/compression/src/main/resources/META-INF/services/org.apache.arrow.vector.compression.CompressionCodec$Factory
b/java/compression/src/main/resources/META-INF/services/org.apache.arrow.vector.compression.CompressionCodec$Factory
new file mode 100644
index 0000000000..ccdcef9aed
--- /dev/null
+++
b/java/compression/src/main/resources/META-INF/services/org.apache.arrow.vector.compression.CompressionCodec$Factory
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.arrow.compression.CommonsCompressionFactory
diff --git
a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodecServiceProvider.java
b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodecServiceProvider.java
new file mode 100644
index 0000000000..795e05d7cb
--- /dev/null
+++
b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodecServiceProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.compression;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.junit.jupiter.api.Test;
+
+public class TestCompressionCodecServiceProvider {
+
+ /**
+ * When arrow-compression is in the classpath/module-path, {@link
+ * CompressionCodec.Factory#INSTANCE} should be able to handle all codec
types.
+ */
+ @Test
+ public void testSupportedCompressionTypes() {
+ assertThrows( // no-compression doesn't support any actual compression
types
+ IllegalArgumentException.class,
+ () -> checkAllCodecTypes(NoCompressionCodec.Factory.INSTANCE));
+ assertThrows( // commons-compression doesn't support the uncompressed type
+ IllegalArgumentException.class,
+ () -> checkAllCodecTypes(CommonsCompressionFactory.INSTANCE));
+ checkAllCodecTypes( // and the winner is...
+ CompressionCodec.Factory.INSTANCE); // combines the two above to
support all types
+ }
+
+ private void checkAllCodecTypes(CompressionCodec.Factory factory) {
+ for (CompressionUtil.CodecType codecType :
CompressionUtil.CodecType.values()) {
+ assertNotNull(factory.createCodec(codecType));
+ }
+ }
+}
diff --git a/java/vector/src/main/java/module-info.java
b/java/vector/src/main/java/module-info.java
index 73af2d1b67..fdea2bd067 100644
--- a/java/vector/src/main/java/module-info.java
+++ b/java/vector/src/main/java/module-info.java
@@ -47,4 +47,6 @@ module org.apache.arrow.vector {
requires org.apache.arrow.memory.core;
requires org.apache.commons.codec;
requires org.slf4j;
+
+ uses org.apache.arrow.vector.compression.CompressionCodec.Factory;
}
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index c076161bc2..ecd3fb9124 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -50,7 +50,7 @@ public class VectorLoader {
* @param root the root to add vectors to based on schema
*/
public VectorLoader(VectorSchemaRoot root) {
- this(root, NoCompressionCodec.Factory.INSTANCE);
+ this(root, CompressionCodec.Factory.INSTANCE);
}
/**
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
index 2de8ff2465..dd62108a84 100644
---
a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
+++
b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
@@ -16,6 +16,9 @@
*/
package org.apache.arrow.vector.compression;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.ServiceLoader;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
@@ -51,11 +54,52 @@ public interface CompressionCodec {
/** Factory to create compression codec. */
interface Factory {
+ /**
+ * This combines all the available factories registered as service
providers in the module path.
+ * For each {@link CompressionUtil.CodecType compression codec type}, it
will use whatever
+ * factory supports it, i.e. doesn't throw on `createCodec(type)`. If
multiple factories
+ * registered as service providers support the same codec type, the first
one encountered while
+ * iterating over the {@link ServiceLoader} will be selected. A codec type
that is not supported
+ * by any registered service provider will fall back to {@link
+ * NoCompressionCodec.Factory#INSTANCE} for backwards compatibility.
+ */
+ Factory INSTANCE = bestEffort();
/** Creates the codec based on the codec type. */
CompressionCodec createCodec(CompressionUtil.CodecType codecType);
/** Creates the codec based on the codec type and compression level. */
CompressionCodec createCodec(CompressionUtil.CodecType codecType, int
compressionLevel);
+
+ private static Factory bestEffort() {
+ final ServiceLoader<Factory> serviceLoader =
ServiceLoader.load(Factory.class);
+ final Map<CompressionUtil.CodecType, Factory> factories =
+ new EnumMap<>(CompressionUtil.CodecType.class);
+ for (Factory factory : serviceLoader) {
+ for (CompressionUtil.CodecType codecType :
CompressionUtil.CodecType.values()) {
+ try {
+ factory.createCodec(codecType); // will throw if not supported
+ factories.putIfAbsent(codecType, factory);
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+
+ final Factory fallback = NoCompressionCodec.Factory.INSTANCE;
+ return new Factory() {
+ @Override
+ public CompressionCodec createCodec(CompressionUtil.CodecType
codecType) {
+ return factories.getOrDefault(codecType,
fallback).createCodec(codecType);
+ }
+
+ @Override
+ public CompressionCodec createCodec(
+ CompressionUtil.CodecType codecType, int compressionLevel) {
+ return factories
+ .getOrDefault(codecType, fallback)
+ .createCodec(codecType, compressionLevel);
+ }
+ };
+ }
}
}
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
index 982651b2ff..7cac0a15a1 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
@@ -27,7 +27,6 @@ import org.apache.arrow.flatbuf.Footer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.compression.CompressionCodec;
-import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowFooter;
@@ -64,7 +63,7 @@ public class ArrowFileReader extends ArrowReader {
}
public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) {
- this(in, allocator, NoCompressionCodec.Factory.INSTANCE);
+ this(in, allocator, CompressionCodec.Factory.INSTANCE);
}
public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) {
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
index 15ade38cd3..7f4addf2d0 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
@@ -28,7 +28,6 @@ import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.compression.CompressionCodec;
-import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
@@ -50,7 +49,7 @@ public abstract class ArrowReader implements
DictionaryProvider, AutoCloseable {
private final CompressionCodec.Factory compressionFactory;
protected ArrowReader(BufferAllocator allocator) {
- this(allocator, NoCompressionCodec.Factory.INSTANCE);
+ this(allocator, CompressionCodec.Factory.INSTANCE);
}
protected ArrowReader(BufferAllocator allocator, CompressionCodec.Factory
compressionFactory) {
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
index 660c6a5f89..69811dc717 100644
---
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
+++
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
@@ -25,7 +25,6 @@ import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.compression.CompressionCodec;
-import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.MessageChannelReader;
@@ -65,7 +64,7 @@ public class ArrowStreamReader extends ArrowReader {
* @param allocator to allocate new buffers
*/
public ArrowStreamReader(MessageChannelReader messageReader, BufferAllocator
allocator) {
- this(messageReader, allocator, NoCompressionCodec.Factory.INSTANCE);
+ this(messageReader, allocator, CompressionCodec.Factory.INSTANCE);
}
/**