This is an automated email from the ASF dual-hosted git repository. uwe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new beb1cb8 ARROW-3755: [GLib] Add GArrowCompressedInputStream and GArrowCompressedOutputStream beb1cb8 is described below commit beb1cb85062299a9a9e066ac20a7ff7a95262eec Author: Kouhei Sutou <k...@clear-code.com> AuthorDate: Tue Nov 20 23:00:24 2018 +0100 ARROW-3755: [GLib] Add GArrowCompressedInputStream and GArrowCompressedOutputStream Author: Kouhei Sutou <k...@clear-code.com> Closes #3002 from kou/glib-compressed-stream and squashes the following commits: aea336ae <Kouhei Sutou> Add GArrowCompressedInputStream and GArrowCompressedOutputStream --- c_glib/arrow-glib/input-stream.cpp | 169 ++++++++++++++++++++++++++ c_glib/arrow-glib/input-stream.h | 18 +++ c_glib/arrow-glib/input-stream.hpp | 8 ++ c_glib/arrow-glib/output-stream.cpp | 170 +++++++++++++++++++++++++++ c_glib/arrow-glib/output-stream.h | 18 +++ c_glib/arrow-glib/output-stream.hpp | 8 ++ c_glib/test/run-test.rb | 1 + c_glib/test/test-compressed-input-stream.rb | 45 +++++++ c_glib/test/test-compressed-output-stream.rb | 43 +++++++ 9 files changed, 480 insertions(+) diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp index d2e6366..bd78ca0 100644 --- a/c_glib/arrow-glib/input-stream.cpp +++ b/c_glib/arrow-glib/input-stream.cpp @@ -26,6 +26,7 @@ #include <arrow/ipc/reader.h> #include <arrow-glib/buffer.hpp> +#include <arrow-glib/codec.hpp> #include <arrow-glib/error.hpp> #include <arrow-glib/file.hpp> #include <arrow-glib/input-stream.hpp> @@ -52,6 +53,9 @@ G_BEGIN_DECLS * * #GArrowGIOInputStream is a class for `GInputStream` based input * stream. + * + * #GArrowCompressedInputStream is a class to read data from + * compressed input stream. */ typedef struct GArrowInputStreamPrivate_ { @@ -718,6 +722,147 @@ garrow_gio_input_stream_get_raw(GArrowGIOInputStream *input_stream) return gio_input_stream; } +typedef struct GArrowCompressedInputStreamPrivate_ { + GArrowCodec *codec; + GArrowInputStream *raw; +} GArrowCompressedInputStreamPrivate; + +enum { + PROP_CODEC = 1, + PROP_RAW +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GArrowCompressedInputStream, + garrow_compressed_input_stream, + GARROW_TYPE_INPUT_STREAM) + +#define GARROW_COMPRESSED_INPUT_STREAM_GET_PRIVATE(object) \ + static_cast<GArrowCompressedInputStreamPrivate *>( \ + garrow_compressed_input_stream_get_instance_private( \ + GARROW_COMPRESSED_INPUT_STREAM(object))) + +static void +garrow_compressed_input_stream_dispose(GObject *object) +{ + auto priv = GARROW_COMPRESSED_INPUT_STREAM_GET_PRIVATE(object); + + if (priv->codec) { + g_object_unref(priv->codec); + priv->codec = NULL; + } + + if (priv->raw) { + g_object_unref(priv->raw); + priv->raw = NULL; + } + + G_OBJECT_CLASS(garrow_compressed_input_stream_parent_class)->dispose(object); +} + +static void +garrow_compressed_input_stream_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_COMPRESSED_INPUT_STREAM_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_CODEC: + priv->codec = GARROW_CODEC(g_value_dup_object(value)); + break; + case PROP_RAW: + priv->raw = GARROW_INPUT_STREAM(g_value_dup_object(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_compressed_input_stream_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_COMPRESSED_INPUT_STREAM_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_CODEC: + g_value_set_object(value, priv->codec); + break; + case PROP_RAW: + g_value_set_object(value, priv->raw); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_compressed_input_stream_init(GArrowCompressedInputStream *object) +{ +} + +static void +garrow_compressed_input_stream_class_init(GArrowCompressedInputStreamClass *klass) +{ + GParamSpec *spec; + + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->dispose = garrow_compressed_input_stream_dispose; + gobject_class->set_property = garrow_compressed_input_stream_set_property; + gobject_class->get_property = garrow_compressed_input_stream_get_property; + + spec = g_param_spec_object("codec", + "Codec", + "The codec for the stream", + GARROW_TYPE_CODEC, + static_cast<GParamFlags>(G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_CODEC, spec); + + spec = g_param_spec_object("raw", + "Raw", + "The underlying raw input stream", + GARROW_TYPE_INPUT_STREAM, + static_cast<GParamFlags>(G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_RAW, spec); +} + +/** + * garrow_compressed_input_stream_new: + * @codec: A #GArrowCodec for compressed data in the @raw. + * @raw: A #GArrowInputStream that contains compressed data. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: A newly created #GArrowCompressedInputStream. + * + * Since: 0.12.0 + */ +GArrowCompressedInputStream * +garrow_compressed_input_stream_new(GArrowCodec *codec, + GArrowInputStream *raw, + GError **error) +{ + auto arrow_codec = garrow_codec_get_raw(codec); + auto arrow_raw = garrow_input_stream_get_raw(raw); + std::shared_ptr<arrow::io::CompressedInputStream> arrow_stream; + auto status = arrow::io::CompressedInputStream::Make(arrow_codec, + arrow_raw, + &arrow_stream); + if (garrow_error_check(error, status, "[compressed-input-stream][new]")) { + return garrow_compressed_input_stream_new_raw(&arrow_stream, + codec, + raw); + } else { + return NULL; + } +} G_END_DECLS @@ -787,3 +932,27 @@ garrow_memory_mapped_input_stream_new_raw(std::shared_ptr<arrow::io::MemoryMappe auto memory_mapped_input_stream = GARROW_MEMORY_MAPPED_INPUT_STREAM(object); return memory_mapped_input_stream; } + +GArrowCompressedInputStream * +garrow_compressed_input_stream_new_raw(std::shared_ptr<arrow::io::CompressedInputStream> *arrow_raw, + GArrowCodec *codec, + GArrowInputStream *raw) +{ + auto compressed_input_stream = + g_object_new(GARROW_TYPE_COMPRESSED_INPUT_STREAM, + "input-stream", arrow_raw, + "codec", codec, + "raw", raw, + NULL); + return GARROW_COMPRESSED_INPUT_STREAM(compressed_input_stream); +} + +std::shared_ptr<arrow::io::InputStream> +garrow_compressed_input_stream_get_raw(GArrowCompressedInputStream *compressed_input_stream) +{ + auto input_stream = GARROW_INPUT_STREAM(compressed_input_stream); + auto arrow_input_stream = garrow_input_stream_get_raw(input_stream); + auto arrow_compressed_input_stream = + std::static_pointer_cast<arrow::io::CompressedInputStream>(arrow_input_stream); + return arrow_compressed_input_stream->raw(); +} diff --git a/c_glib/arrow-glib/input-stream.h b/c_glib/arrow-glib/input-stream.h index 840f27d..1a4c9cf 100644 --- a/c_glib/arrow-glib/input-stream.h +++ b/c_glib/arrow-glib/input-stream.h @@ -22,6 +22,7 @@ #include <gio/gio.h> #include <arrow-glib/buffer.h> +#include <arrow-glib/codec.h> #include <arrow-glib/tensor.h> G_BEGIN_DECLS @@ -183,4 +184,21 @@ GType garrow_gio_input_stream_get_type(void) G_GNUC_CONST; GArrowGIOInputStream *garrow_gio_input_stream_new(GInputStream *gio_input_stream); GInputStream *garrow_gio_input_stream_get_raw(GArrowGIOInputStream *input_stream); +#define GARROW_TYPE_COMPRESSED_INPUT_STREAM \ + (garrow_compressed_input_stream_get_type()) +G_DECLARE_DERIVABLE_TYPE(GArrowCompressedInputStream, + garrow_compressed_input_stream, + GARROW, + COMPRESSED_INPUT_STREAM, + GArrowInputStream) +struct _GArrowCompressedInputStreamClass +{ + GArrowInputStreamClass parent_class; +}; + +GArrowCompressedInputStream * +garrow_compressed_input_stream_new(GArrowCodec *codec, + GArrowInputStream *raw, + GError **error); + G_END_DECLS diff --git a/c_glib/arrow-glib/input-stream.hpp b/c_glib/arrow-glib/input-stream.hpp index 1d835e8..34857a1 100644 --- a/c_glib/arrow-glib/input-stream.hpp +++ b/c_glib/arrow-glib/input-stream.hpp @@ -19,6 +19,7 @@ #pragma once +#include <arrow/io/compressed.h> #include <arrow/io/file.h> #include <arrow/io/interfaces.h> #include <arrow/io/memory.h> @@ -36,3 +37,10 @@ GArrowBufferInputStream *garrow_buffer_input_stream_new_raw_buffer(std::shared_p std::shared_ptr<arrow::io::BufferReader> garrow_buffer_input_stream_get_raw(GArrowBufferInputStream *input_stream); GArrowMemoryMappedInputStream *garrow_memory_mapped_input_stream_new_raw(std::shared_ptr<arrow::io::MemoryMappedFile> *arrow_memory_mapped_file); + +GArrowCompressedInputStream * +garrow_compressed_input_stream_new_raw(std::shared_ptr<arrow::io::CompressedInputStream> *arrow_raw, + GArrowCodec *codec, + GArrowInputStream *raw); +std::shared_ptr<arrow::io::InputStream> +garrow_compressed_input_stream_get_raw(GArrowCompressedInputStream *stream); diff --git a/c_glib/arrow-glib/output-stream.cpp b/c_glib/arrow-glib/output-stream.cpp index 7943f74..946ee0b 100644 --- a/c_glib/arrow-glib/output-stream.cpp +++ b/c_glib/arrow-glib/output-stream.cpp @@ -25,6 +25,7 @@ #include <arrow/ipc/writer.h> #include <arrow-glib/buffer.hpp> +#include <arrow-glib/codec.hpp> #include <arrow-glib/error.hpp> #include <arrow-glib/file.hpp> #include <arrow-glib/output-stream.hpp> @@ -51,6 +52,9 @@ G_BEGIN_DECLS * * #GArrowGIOOutputStream is a class for `GOutputStream` based output * stream. + * + * #GArrowCompressedOutputStream is a class to write compressed data to + * output stream. */ typedef struct GArrowOutputStreamPrivate_ { @@ -441,6 +445,148 @@ garrow_gio_output_stream_get_raw(GArrowGIOOutputStream *output_stream) return gio_output_stream; } +typedef struct GArrowCompressedOutputStreamPrivate_ { + GArrowCodec *codec; + GArrowOutputStream *raw; +} GArrowCompressedOutputStreamPrivate; + +enum { + PROP_CODEC = 1, + PROP_RAW +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GArrowCompressedOutputStream, + garrow_compressed_output_stream, + GARROW_TYPE_OUTPUT_STREAM) + +#define GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object) \ + static_cast<GArrowCompressedOutputStreamPrivate *>( \ + garrow_compressed_output_stream_get_instance_private( \ + GARROW_COMPRESSED_OUTPUT_STREAM(object))) + +static void +garrow_compressed_output_stream_dispose(GObject *object) +{ + auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); + + if (priv->codec) { + g_object_unref(priv->codec); + priv->codec = NULL; + } + + if (priv->raw) { + g_object_unref(priv->raw); + priv->raw = NULL; + } + + G_OBJECT_CLASS(garrow_compressed_output_stream_parent_class)->dispose(object); +} + +static void +garrow_compressed_output_stream_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_CODEC: + priv->codec = GARROW_CODEC(g_value_dup_object(value)); + break; + case PROP_RAW: + priv->raw = GARROW_OUTPUT_STREAM(g_value_dup_object(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_compressed_output_stream_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_CODEC: + g_value_set_object(value, priv->codec); + break; + case PROP_RAW: + g_value_set_object(value, priv->raw); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_compressed_output_stream_init(GArrowCompressedOutputStream *object) +{ +} + +static void +garrow_compressed_output_stream_class_init(GArrowCompressedOutputStreamClass *klass) +{ + GParamSpec *spec; + + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->dispose = garrow_compressed_output_stream_dispose; + gobject_class->set_property = garrow_compressed_output_stream_set_property; + gobject_class->get_property = garrow_compressed_output_stream_get_property; + + spec = g_param_spec_object("codec", + "Codec", + "The codec for the stream", + GARROW_TYPE_CODEC, + static_cast<GParamFlags>(G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_CODEC, spec); + + spec = g_param_spec_object("raw", + "Raw", + "The underlying raw output stream", + GARROW_TYPE_OUTPUT_STREAM, + static_cast<GParamFlags>(G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_RAW, spec); +} + +/** + * garrow_compressed_output_stream_new: + * @codec: A #GArrowCodec for compressed data in the @raw. + * @raw: A #GArrowOutputStream that is a sink for compressed data. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: A newly created #GArrowCompressedOutputStream. + * + * Since: 0.12.0 + */ +GArrowCompressedOutputStream * +garrow_compressed_output_stream_new(GArrowCodec *codec, + GArrowOutputStream *raw, + GError **error) +{ + auto arrow_codec = garrow_codec_get_raw(codec); + auto arrow_raw = garrow_output_stream_get_raw(raw); + std::shared_ptr<arrow::io::CompressedOutputStream> arrow_stream; + auto status = arrow::io::CompressedOutputStream::Make(arrow_codec, + arrow_raw, + &arrow_stream); + if (garrow_error_check(error, status, "[compressed-output-stream][new]")) { + return garrow_compressed_output_stream_new_raw(&arrow_stream, + codec, + raw); + } else { + return NULL; + } +} + G_END_DECLS @@ -483,3 +629,27 @@ garrow_buffer_output_stream_new_raw(std::shared_ptr<arrow::io::BufferOutputStrea NULL)); return buffer_output_stream; } + +GArrowCompressedOutputStream * +garrow_compressed_output_stream_new_raw(std::shared_ptr<arrow::io::CompressedOutputStream> *arrow_raw, + GArrowCodec *codec, + GArrowOutputStream *raw) +{ + auto compressed_output_stream = + g_object_new(GARROW_TYPE_COMPRESSED_OUTPUT_STREAM, + "output-stream", arrow_raw, + "codec", codec, + "raw", raw, + NULL); + return GARROW_COMPRESSED_OUTPUT_STREAM(compressed_output_stream); +} + +std::shared_ptr<arrow::io::OutputStream> +garrow_compressed_output_stream_get_raw(GArrowCompressedOutputStream *compressed_output_stream) +{ + auto output_stream = GARROW_OUTPUT_STREAM(compressed_output_stream); + auto arrow_output_stream = garrow_output_stream_get_raw(output_stream); + auto arrow_compressed_output_stream = + std::static_pointer_cast<arrow::io::CompressedOutputStream>(arrow_output_stream); + return arrow_compressed_output_stream->raw(); +} diff --git a/c_glib/arrow-glib/output-stream.h b/c_glib/arrow-glib/output-stream.h index 5d887c5..0318652 100644 --- a/c_glib/arrow-glib/output-stream.h +++ b/c_glib/arrow-glib/output-stream.h @@ -22,6 +22,7 @@ #include <gio/gio.h> #include <arrow-glib/buffer.h> +#include <arrow-glib/codec.h> #include <arrow-glib/tensor.h> G_BEGIN_DECLS @@ -194,4 +195,21 @@ GType garrow_gio_output_stream_get_type(void) G_GNUC_CONST; GArrowGIOOutputStream *garrow_gio_output_stream_new(GOutputStream *gio_output_stream); GOutputStream *garrow_gio_output_stream_get_raw(GArrowGIOOutputStream *output_stream); +#define GARROW_TYPE_COMPRESSED_OUTPUT_STREAM \ + (garrow_compressed_output_stream_get_type()) +G_DECLARE_DERIVABLE_TYPE(GArrowCompressedOutputStream, + garrow_compressed_output_stream, + GARROW, + COMPRESSED_OUTPUT_STREAM, + GArrowOutputStream) +struct _GArrowCompressedOutputStreamClass +{ + GArrowOutputStreamClass parent_class; +}; + +GArrowCompressedOutputStream * +garrow_compressed_output_stream_new(GArrowCodec *codec, + GArrowOutputStream *raw, + GError **error); + G_END_DECLS diff --git a/c_glib/arrow-glib/output-stream.hpp b/c_glib/arrow-glib/output-stream.hpp index 5d22f1d..b39b3bd 100644 --- a/c_glib/arrow-glib/output-stream.hpp +++ b/c_glib/arrow-glib/output-stream.hpp @@ -19,6 +19,7 @@ #pragma once +#include <arrow/io/compressed.h> #include <arrow/io/file.h> #include <arrow/io/memory.h> @@ -30,3 +31,10 @@ std::shared_ptr<arrow::io::OutputStream> garrow_output_stream_get_raw(GArrowOutp GArrowFileOutputStream *garrow_file_output_stream_new_raw(std::shared_ptr<arrow::io::FileOutputStream> *arrow_file_output_stream); GArrowBufferOutputStream *garrow_buffer_output_stream_new_raw(std::shared_ptr<arrow::io::BufferOutputStream> *arrow_buffer_output_stream); + +GArrowCompressedOutputStream * +garrow_compressed_output_stream_new_raw(std::shared_ptr<arrow::io::CompressedOutputStream> *arrow_raw, + GArrowCodec *codec, + GArrowOutputStream *raw); +std::shared_ptr<arrow::io::OutputStream> +garrow_compressed_output_stream_get_raw(GArrowCompressedOutputStream *stream); diff --git a/c_glib/test/run-test.rb b/c_glib/test/run-test.rb index abaa4dc..238bb2d 100755 --- a/c_glib/test/run-test.rb +++ b/c_glib/test/run-test.rb @@ -60,6 +60,7 @@ end require "fileutils" require "rbconfig" require "tempfile" +require "zlib" require_relative "helper/buildable" require_relative "helper/fixture" require_relative "helper/omittable" diff --git a/c_glib/test/test-compressed-input-stream.rb b/c_glib/test/test-compressed-input-stream.rb new file mode 100644 index 0000000..71f230a --- /dev/null +++ b/c_glib/test/test-compressed-input-stream.rb @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestCompressedInputStream < Test::Unit::TestCase + include Helper::Buildable + + def test_read + data = "Hello" + + output = StringIO.new + Zlib::GzipWriter.wrap(output) do |gz| + gz.write(data) + end + + codec = Arrow::Codec.new(:gzip) + buffer = Arrow::Buffer.new(output.string) + raw_input = Arrow::BufferInputStream.new(buffer) + input = Arrow::CompressedInputStream.new(codec, raw_input) + assert_equal(data, input.read(data.bytesize).data.to_s) + input.close + raw_input.close + end + + def test_raw + buffer = Arrow::Buffer.new("Hello") + raw_input = Arrow::BufferInputStream.new(buffer) + codec = Arrow::Codec.new(:gzip) + input = Arrow::CompressedInputStream.new(codec, raw_input) + assert_equal(raw_input, input.raw) + end +end diff --git a/c_glib/test/test-compressed-output-stream.rb b/c_glib/test/test-compressed-output-stream.rb new file mode 100644 index 0000000..eb54a45 --- /dev/null +++ b/c_glib/test/test-compressed-output-stream.rb @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestCompressedOutputStream < Test::Unit::TestCase + include Helper::Buildable + + def test_write + data = "Hello" + buffer = Arrow::ResizableBuffer.new(8) + raw_output = Arrow::BufferOutputStream.new(buffer) + codec = Arrow::Codec.new(:gzip) + output = Arrow::CompressedOutputStream.new(codec, raw_output) + output.write(data) + output.close + + input = StringIO.new(buffer.data.to_s) + Zlib::GzipReader.wrap(input) do |gz| + assert_equal(data, gz.read) + end + end + + def test_raw + buffer = Arrow::ResizableBuffer.new(8) + raw_output = Arrow::BufferOutputStream.new(buffer) + codec = Arrow::Codec.new(:gzip) + output = Arrow::CompressedOutputStream.new(codec, raw_output) + assert_equal(raw_output, output.raw) + end +end