This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new ed37b14f40 GH-49422: [CI][Integration][Ruby] Add the Ruby
implementation (#49423)
ed37b14f40 is described below
commit ed37b14f40e2dc62da067db20ede9061d400ecd1
Author: Sutou Kouhei <[email protected]>
AuthorDate: Tue Mar 10 13:14:11 2026 +0900
GH-49422: [CI][Integration][Ruby] Add the Ruby implementation (#49423)
### Rationale for this change
There are some missing features in the Ruby implmentaion for now but we can
pass them by skipping some tests in our integration tests.
### What changes are included in this PR?
Archery:
* Add `--with-ruby` to `archery integration`
* Add `archery.integration.tester_ruby.RubyTester`
* Add `no_map_field_names_validate` quirk for GH-49415
* Show environment variables too on external command failure because Ruby
tester uses environment variables not command line arguments to pass
information to integration tester
* Use `ARCHERY_INTEGRATION_WITH_CPP=1` instead of
`ARROW_INTEGRATION_CPP=ON` like other implementations such as
`ARCHERY_INTEGRATION_WITH_GO`
Ruby:
* Add `red-arrow-format-integration-test` as the test driver
* This is not included in `.gem` because this is only for development
* Add `ruby/red-arrow-format/lib/arrow-format/integration/` as helpers of
the test driver
* This is not included in `.gem` because this is only for development
* Add `ArrowFormat::Array#empty?`
* Add `ArrowFormat::RecordBatch#empty?`
* Add `ArrowFormat::NullArray#n_nulls`
* `ArrowFormat::*Array#to_a`: Add support for empty case
* Fix Apache Arrow decimal <-> `BigDecimal` conversion
* `ArrowFormat::Bitmap#each`: Fix a bug that one bit is ignored
* Move dictionary ID to `ArrowFormat::DictionaryType` from
`ArrowFormat::Field`
* Add support for V4 union that has validity bitmap
* Add support for no continuation token message for backward compatibility
* `ArrowFormat::StreamingReader`: Add support for reading schema without
calling `#each`
* `ArrowFormat::MapType`: Add support for keys sorted
* `ArrowFormat::MapType`: Always use "key"/"value"/"entries" for field names
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
* GitHub Issue: #49422
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
.github/workflows/integration.yml | 2 +
ci/docker/conda-integration.dockerfile | 1 +
ci/scripts/integration_arrow.sh | 10 +-
ci/scripts/integration_arrow_build.sh | 11 +-
dev/archery/archery/cli.py | 6 +-
dev/archery/archery/integration/datagen.py | 11 +-
dev/archery/archery/integration/runner.py | 12 +-
dev/archery/archery/integration/tester_ruby.py | 78 +++
dev/archery/archery/integration/util.py | 9 +-
.../bin/red-arrow-format-integration-test | 64 ++
ruby/red-arrow-format/lib/arrow-format/array.rb | 126 +++-
ruby/red-arrow-format/lib/arrow-format/bitmap.rb | 2 +-
ruby/red-arrow-format/lib/arrow-format/error.rb | 2 +-
ruby/red-arrow-format/lib/arrow-format/field.rb | 5 +-
.../lib/arrow-format/file-reader.rb | 26 +-
.../lib/arrow-format/integration/json-reader.rb | 408 +++++++++++++
.../options.rb} | 50 +-
.../lib/arrow-format/integration/validate.rb | 656 +++++++++++++++++++++
ruby/red-arrow-format/lib/arrow-format/readable.rb | 111 +++-
.../lib/arrow-format/record-batch.rb | 6 +
.../lib/arrow-format/streaming-pull-reader.rb | 25 +-
.../lib/arrow-format/streaming-reader.rb | 46 +-
.../lib/arrow-format/streaming-writer.rb | 4 +-
ruby/red-arrow-format/lib/arrow-format/type.rb | 24 +-
ruby/red-arrow-format/red-arrow-format.gemspec | 3 +-
ruby/red-arrow-format/test/test-reader.rb | 2 +-
ruby/red-arrow-format/test/test-writer.rb | 29 +-
ruby/red-arrow/lib/arrow/table-formatter.rb | 6 +-
28 files changed, 1585 insertions(+), 150 deletions(-)
diff --git a/.github/workflows/integration.yml
b/.github/workflows/integration.yml
index 2ef7f1ca78..583a7009ad 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -33,6 +33,7 @@ on:
- 'integration/**'
- 'cpp/**'
- 'format/**'
+ - 'ruby/red-arrow-format/**'
pull_request:
paths:
- '.dockerignore'
@@ -43,6 +44,7 @@ on:
- 'integration/**'
- 'cpp/**'
- 'format/**'
+ - 'ruby/red-arrow-format/**'
concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{
github.workflow }}
diff --git a/ci/docker/conda-integration.dockerfile
b/ci/docker/conda-integration.dockerfile
index b0e5ec966d..ac7ca57156 100644
--- a/ci/docker/conda-integration.dockerfile
+++ b/ci/docker/conda-integration.dockerfile
@@ -42,6 +42,7 @@ RUN mamba install -q -y \
nodejs=${node} \
yarn=${yarn} \
openjdk=${jdk} \
+ ruby \
zstd && \
mamba clean --yes --all --force-pkgs-dirs
diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh
index 2ee047c50e..7e315a60a6 100755
--- a/ci/scripts/integration_arrow.sh
+++ b/ci/scripts/integration_arrow.sh
@@ -24,9 +24,14 @@ build_dir=${2}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration
+# For backward compatibility.
: "${ARROW_INTEGRATION_CPP:=ON}"
+: "${ARCHERY_INTEGRATION_WITH_CPP:=$([ "${ARROW_INTEGRATION_CPP}" = "ON" ] &&
echo "1" || echo "0")}"
+export ARCHERY_INTEGRATION_WITH_CPP
+: "${ARCHERY_INTEGRATION_WITH_RUBY:=1}"
+export ARCHERY_INTEGRATION_WITH_RUBY
-: "${ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS:=cpp}"
+: "${ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS:=cpp,ruby}"
export ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS
. "${arrow_dir}/ci/scripts/util_log.sh"
@@ -57,14 +62,11 @@ export PYTHONFAULTHANDLER=1
export GOMEMLIMIT=200MiB
export GODEBUG=gctrace=1,clobberfree=1
-ARCHERY_WITH_CPP=$([ "$ARROW_INTEGRATION_CPP" == "ON" ] && echo "1" || echo
"0")
-
# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
time archery integration \
--run-c-data \
--run-ipc \
--run-flight \
- --with-cpp="${ARCHERY_WITH_CPP}" \
--gold-dirs="$gold_dir/0.14.1" \
--gold-dirs="$gold_dir/0.17.1" \
--gold-dirs="$gold_dir/1.0.0-bigendian" \
diff --git a/ci/scripts/integration_arrow_build.sh
b/ci/scripts/integration_arrow_build.sh
index 61ad0ea59e..691faf7c2d 100755
--- a/ci/scripts/integration_arrow_build.sh
+++ b/ci/scripts/integration_arrow_build.sh
@@ -22,7 +22,10 @@ set -e
arrow_dir=${1}
build_dir=${2}
+# For backward compatibility.
: "${ARROW_INTEGRATION_CPP:=ON}"
+: "${ARCHERY_INTEGRATION_WITH_CPP:=$([ "${ARROW_INTEGRATION_CPP}" = "ON" ] &&
echo "1" || echo "0")}"
+: "${ARCHERY_INTEGRATION_WITH_RUBY:=1}"
. "${arrow_dir}/ci/scripts/util_log.sh"
@@ -41,7 +44,7 @@ fi
github_actions_group_end
github_actions_group_begin "Integration: Build: C++"
-if [ "${ARROW_INTEGRATION_CPP}" == "ON" ]; then
+if [ "${ARCHERY_INTEGRATION_WITH_CPP}" -gt "0" ]; then
"${arrow_dir}/ci/scripts/cpp_build.sh" "${arrow_dir}" "${build_dir}"
fi
github_actions_group_end
@@ -69,3 +72,9 @@ if [ "${ARCHERY_INTEGRATION_WITH_JS}" -gt "0" ]; then
cp -a "${arrow_dir}/js" "${build_dir}/js"
fi
github_actions_group_end
+
+github_actions_group_begin "Integration: Build: Ruby"
+if [ "${ARCHERY_INTEGRATION_WITH_RUBY}" -gt "0" ]; then
+ rake -C "${arrow_dir}/ruby/red-arrow-format" install
+fi
+github_actions_group_end
diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py
index f08656fa94..e70cc3874f 100644
--- a/dev/archery/archery/cli.py
+++ b/dev/archery/archery/cli.py
@@ -667,7 +667,8 @@ def _set_default(opt, default):
@click.option('--random-seed', type=int, default=12345,
help="Seed for PRNG when generating test data")
@click.option('--with-cpp', type=bool, default=False,
- help='Include C++ in integration tests')
+ help='Include C++ in integration tests',
+ envvar="ARCHERY_INTEGRATION_WITH_CPP")
@click.option('--with-dotnet', type=bool, default=False,
help='Include .NET in integration tests',
envvar="ARCHERY_INTEGRATION_WITH_DOTNET")
@@ -683,6 +684,9 @@ def _set_default(opt, default):
@click.option('--with-nanoarrow', type=bool, default=False,
help='Include nanoarrow in integration tests',
envvar="ARCHERY_INTEGRATION_WITH_NANOARROW")
[email protected]('--with-ruby', type=bool, default=False,
+ help='Include Ruby in integration tests',
+ envvar="ARCHERY_INTEGRATION_WITH_RUBY")
@click.option('--with-rust', type=bool, default=False,
help='Include Rust in integration tests',
envvar="ARCHERY_INTEGRATION_WITH_RUST")
diff --git a/dev/archery/archery/integration/datagen.py
b/dev/archery/archery/integration/datagen.py
index 83913dc379..6b3b13f51d 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1937,6 +1937,7 @@ def get_generated_json_files(tempdir=None):
.skip_tester('Java')
.skip_tester('JS')
.skip_tester('nanoarrow')
+ .skip_tester('Ruby')
.skip_tester('Rust')
.skip_tester('Go'),
@@ -1944,6 +1945,7 @@ def get_generated_json_files(tempdir=None):
.skip_tester('Java')
.skip_tester('JS')
.skip_tester('nanoarrow')
+ .skip_tester('Ruby')
.skip_tester('Rust')
.skip_tester('Go'),
@@ -1993,19 +1995,22 @@ def get_generated_json_files(tempdir=None):
.skip_tester('nanoarrow')
.skip_tester('Java') # TODO(ARROW-7779)
# TODO(https://github.com/apache/arrow/issues/38045)
- .skip_format(SKIP_FLIGHT, '.NET'),
+ .skip_format(SKIP_FLIGHT, '.NET')
+ .skip_tester('Ruby'),
generate_run_end_encoded_case()
.skip_tester('.NET')
.skip_tester('JS')
# TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
.skip_tester('nanoarrow')
+ .skip_tester('Ruby')
.skip_tester('Rust'),
generate_binary_view_case()
.skip_tester('JS')
# TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
.skip_tester('nanoarrow')
+ .skip_tester('Ruby')
.skip_tester('Rust'),
generate_list_view_case()
@@ -2013,12 +2018,14 @@ def get_generated_json_files(tempdir=None):
.skip_tester('JS')
# TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
.skip_tester('nanoarrow')
+ .skip_tester('Ruby')
.skip_tester('Rust'),
generate_extension_case()
.skip_tester('nanoarrow')
# TODO(https://github.com/apache/arrow/issues/38045)
- .skip_format(SKIP_FLIGHT, '.NET'),
+ .skip_format(SKIP_FLIGHT, '.NET')
+ .skip_tester('Ruby'),
]
generated_paths = []
diff --git a/dev/archery/archery/integration/runner.py
b/dev/archery/archery/integration/runner.py
index 29e3364224..9c0fda371e 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -196,9 +196,11 @@ class IntegrationRunner(object):
skip_testers.add(".NET")
skip_testers.add("Java")
skip_testers.add("JS")
+ skip_testers.add("Ruby")
skip_testers.add("Rust")
if prefix == '2.0.0-compression':
skip_testers.add("JS")
+ skip_testers.add("Ruby")
if prefix == '2.0.0-compression' and 'lz4' in name:
# https://github.com/apache/arrow-nanoarrow/issues/621
skip_testers.add("nanoarrow")
@@ -590,9 +592,9 @@ def get_static_json_files():
def select_testers(with_cpp=True, with_java=True, with_js=True,
- with_dotnet=True, with_go=True, with_rust=False,
- with_nanoarrow=False, target_implementations="",
- **kwargs):
+ with_dotnet=True, with_go=True, with_ruby=False,
+ with_rust=False, with_nanoarrow=False,
+ target_implementations="", **kwargs):
target_implementations = (target_implementations.split(",")
if target_implementations else [])
@@ -629,6 +631,10 @@ def select_testers(with_cpp=True, with_java=True,
with_js=True,
from .tester_nanoarrow import NanoarrowTester
append_tester("nanoarrow", NanoarrowTester(**kwargs))
+ if with_ruby:
+ from .tester_ruby import RubyTester
+ append_tester("ruby", RubyTester(**kwargs))
+
if with_rust:
from .tester_rust import RustTester
append_tester("rust", RustTester(**kwargs))
diff --git a/dev/archery/archery/integration/tester_ruby.py
b/dev/archery/archery/integration/tester_ruby.py
new file mode 100644
index 0000000000..a84ba825cb
--- /dev/null
+++ b/dev/archery/archery/integration/tester_ruby.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from .tester import Tester
+from .util import run_cmd, log
+from ..utils.source import ARROW_ROOT_DEFAULT
+
+
+_EXE_PATH = os.path.join(
+ ARROW_ROOT_DEFAULT,
"ruby/red-arrow-format/bin/red-arrow-format-integration-test")
+
+
+class RubyTester(Tester):
+ PRODUCER = True
+ CONSUMER = True
+
+ name = "Ruby"
+
+ def _run(self, env):
+ command_line = [_EXE_PATH]
+ if self.debug:
+ command_line_string = ""
+ for key, value in env.items:
+ command_line_string += f"{key}={value} "
+ command_line_string += " ".join(command_line)
+ log(command_line_string)
+ run_cmd(command_line, env=os.environ | env)
+
+ def validate(self, json_path, arrow_path, quirks=None):
+ env = {
+ "ARROW": arrow_path,
+ "COMMAND": "validate",
+ "JSON": json_path,
+ }
+ if quirks:
+ for quirk in quirks:
+ env[f"QUIRK_{quirk.upper()}"] = "true"
+ self._run(env)
+
+ def json_to_file(self, json_path, arrow_path):
+ env = {
+ "ARROW": arrow_path,
+ "COMMAND": "json-to-file",
+ "JSON": json_path,
+ }
+ self._run(env)
+
+ def stream_to_file(self, stream_path, file_path):
+ env = {
+ "ARROW": file_path,
+ "ARROWS": stream_path,
+ "COMMAND": "stream-to-file",
+ }
+ self._run(env)
+
+ def file_to_stream(self, file_path, stream_path):
+ env = {
+ "ARROW": file_path,
+ "ARROWS": stream_path,
+ "COMMAND": "file-to-stream",
+ }
+ self._run(env)
diff --git a/dev/archery/archery/integration/util.py
b/dev/archery/archery/integration/util.py
index 1b1eb95a1d..f2fb5ede20 100644
--- a/dev/archery/archery/integration/util.py
+++ b/dev/archery/archery/integration/util.py
@@ -17,6 +17,7 @@
import contextlib
import io
+import os
import random
import socket
import subprocess
@@ -137,7 +138,13 @@ def run_cmd(cmd, **kwargs):
except subprocess.CalledProcessError as e:
# this avoids hiding the stdout / stderr of failed processes
sio = io.StringIO()
- print('Command failed:', " ".join(cmd), file=sio)
+ command_line_string = ''
+ env = kwargs.get('env', {})
+ for key in env.keys() - os.environ.keys():
+ value = env[key]
+ command_line_string += f'{key}={value} '
+ command_line_string += ' '.join(cmd)
+ print(f'Command failed: {command_line_string}', file=sio)
print('With output:', file=sio)
print('--------------', file=sio)
print(frombytes(e.output), file=sio)
diff --git a/ruby/red-arrow-format/bin/red-arrow-format-integration-test
b/ruby/red-arrow-format/bin/red-arrow-format-integration-test
new file mode 100755
index 0000000000..8571621f3d
--- /dev/null
+++ b/ruby/red-arrow-format/bin/red-arrow-format-integration-test
@@ -0,0 +1,64 @@
+#!/usr/bin/env ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require_relative "../lib/arrow-format"
+require_relative "../lib/arrow-format/integration/options"
+
+options = ArrowFormat::Integration::Options.singleton
+case options.command
+when "validate"
+ require_relative "../lib/arrow-format/integration/validate"
+when "json-to-file"
+ require_relative "../lib/arrow-format/integration/json-reader"
+ File.open(options.json, "r") do |input|
+ reader = ArrowFormat::Integration::JSONReader.new(input)
+ File.open(options.arrow, "wb") do |output|
+ writer = ArrowFormat::FileWriter.new(output)
+ writer.start(reader.schema)
+ reader.each do |record_batch|
+ writer.write_record_batch(record_batch)
+ end
+ writer.finish
+ end
+ end
+when "stream-to-file"
+ File.open(options.arrows, "rb") do |input|
+ reader = ArrowFormat::StreamingReader.new(input)
+ File.open(options.arrow, "wb") do |output|
+ writer = ArrowFormat::FileWriter.new(output)
+ writer.start(reader.schema)
+ reader.each do |record_batch|
+ writer.write_record_batch(record_batch)
+ end
+ writer.finish
+ end
+ end
+when "file-to-stream"
+ File.open(options.arrow, "rb") do |input|
+ reader = ArrowFormat::FileReader.new(input)
+ File.open(options.arrows, "wb") do |output|
+ writer = ArrowFormat::StreamingWriter.new(output)
+ writer.start(reader.schema)
+ reader.each do |record_batch|
+ writer.write_record_batch(record_batch)
+ end
+ writer.finish
+ end
+ end
+end
diff --git a/ruby/red-arrow-format/lib/arrow-format/array.rb
b/ruby/red-arrow-format/lib/arrow-format/array.rb
index 951de74475..10be36d530 100644
--- a/ruby/red-arrow-format/lib/arrow-format/array.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/array.rb
@@ -56,6 +56,10 @@ module ArrowFormat
end
end
+ def empty?
+ @size.zero?
+ end
+
protected
def slice!(offset, size)
@offset = offset
@@ -144,6 +148,10 @@ module ArrowFormat
return to_enum(__method__) unless block_given?
end
+ def n_nulls
+ @size
+ end
+
def to_a
[nil] * @size
end
@@ -156,6 +164,8 @@ module ArrowFormat
end
def to_a
+ return [] if empty?
+
offset = element_size * @offset
apply_validity(@values_buffer.values(@type.buffer_type, offset, @size))
end
@@ -177,6 +187,8 @@ module ArrowFormat
class BooleanArray < PrimitiveArray
def to_a
+ return [] if empty?
+
@values_bitmap ||= Bitmap.new(@values_buffer, @offset, @size)
values = @values_bitmap.to_a
apply_validity(values)
@@ -264,6 +276,8 @@ module ArrowFormat
class DayTimeIntervalArray < IntervalArray
def to_a
+ return [] if empty?
+
offset = element_size * @offset
values = @values_buffer.
each(@type.buffer_type, offset, @size * 2).
@@ -282,6 +296,8 @@ module ArrowFormat
class MonthDayNanoIntervalArray < IntervalArray
def to_a
+ return [] if empty?
+
buffer_types = @type.buffer_types
value_size = IO::Buffer.size_of(buffer_types)
base_offset = value_size * @offset
@@ -301,7 +317,7 @@ module ArrowFormat
class DurationArray < TemporalArray
end
- class VariableSizeBinaryLayoutArray < Array
+ class VariableSizeBinaryArray < Array
def initialize(type, size, validity_buffer, offsets_buffer, values_buffer)
super(type, size, validity_buffer)
@offsets_buffer = offsets_buffer
@@ -323,11 +339,18 @@ module ArrowFormat
yield(sliced_values_buffer)
end
- def to_a
- values = @offsets_buffer.
+ def offsets
+ return [0] if empty?
+
+ @offsets_buffer.
each(@type.offset_buffer_type, offset_size * @offset, @size + 1).
- each_cons(2).
- collect do |(_, offset), (_, next_offset)|
+ collect {|_, offset| offset}
+ end
+
+ def to_a
+ return [] if empty?
+
+ values = offsets.each_cons(2).collect do |offset, next_offset|
length = next_offset - offset
@values_buffer.get_string(offset, length, @type.encoding)
end
@@ -340,16 +363,19 @@ module ArrowFormat
end
end
- class BinaryArray < VariableSizeBinaryLayoutArray
+ class BinaryArray < VariableSizeBinaryArray
+ end
+
+ class LargeBinaryArray < VariableSizeBinaryArray
end
- class LargeBinaryArray < VariableSizeBinaryLayoutArray
+ class VariableSizeUTF8Array < VariableSizeBinaryArray
end
- class UTF8Array < VariableSizeBinaryLayoutArray
+ class UTF8Array < VariableSizeUTF8Array
end
- class LargeUTF8Array < VariableSizeBinaryLayoutArray
+ class LargeUTF8Array < VariableSizeUTF8Array
end
class FixedSizeBinaryArray < Array
@@ -368,6 +394,8 @@ module ArrowFormat
end
def to_a
+ return [] if empty?
+
byte_width = @type.byte_width
values = 0.step(@size * byte_width - 1, byte_width).collect do |offset|
@values_buffer.get_string(offset, byte_width)
@@ -378,6 +406,8 @@ module ArrowFormat
class DecimalArray < FixedSizeBinaryArray
def to_a
+ return [] if empty?
+
byte_width = @type.byte_width
buffer_types = [:u64] * (byte_width / 8 - 1) + [:s64]
base_offset = byte_width * @offset
@@ -408,8 +438,8 @@ module ArrowFormat
elsif @type.scale > 0
n_digits = string.bytesize
n_digits -= 1 if value < 0
- if n_digits < @type.scale
- prefix = "0." + ("0" * (@type.scale - n_digits - 1))
+ if n_digits <= @type.scale
+ prefix = "0." + ("0" * (@type.scale - n_digits))
if value < 0
string[1, 0] = prefix
else
@@ -446,12 +476,19 @@ module ArrowFormat
@type.offset_buffer_type))
end
+ def offsets
+ return [0] if empty?
+
+ @offsets_buffer.
+ each(@type.offset_buffer_type, offset_size * @offset, @size + 1).
+ collect {|_, offset| offset}
+ end
+
def to_a
+ return [] if empty?
+
child_values = @child.to_a
- values = @offsets_buffer.
- each(@type.offset_buffer_type, offset_size * @offset, @size + 1).
- each_cons(2).
- collect do |(_, offset), (_, next_offset)|
+ values = offsets.each_cons(2).collect do |offset, next_offset|
child_values[offset...next_offset]
end
apply_validity(values)
@@ -494,6 +531,8 @@ module ArrowFormat
end
def to_a
+ return [] if empty?
+
values = @child.to_a.each_slice(@type.size).to_a
apply_validity(values)
end
@@ -520,6 +559,8 @@ module ArrowFormat
end
def to_a
+ return [] if empty?
+
if @children.empty?
values = [[]] * @size
else
@@ -540,6 +581,8 @@ module ArrowFormat
class MapArray < VariableSizeListArray
def to_a
+ return [] if empty?
+
super.collect do |entries|
if entries.nil?
entries
@@ -555,6 +598,7 @@ module ArrowFormat
end
class UnionArray < Array
+ attr_reader :types_buffer
attr_reader :children
def initialize(type, size, types_buffer, children)
super(type, size, nil)
@@ -562,6 +606,18 @@ module ArrowFormat
@children = children
end
+ def each_type(&block)
+ return [].each(&block) if empty?
+
+ return to_enum(__method__) unless block_given?
+
+ @types_buffer.each(type_buffer_type,
+ type_element_size * @offset,
+ @size) do |_, type|
+ yield(type)
+ end
+ end
+
private
def type_buffer_type
:S8
@@ -590,15 +646,23 @@ module ArrowFormat
yield(@offsets_buffer)
end
+ def each_offset(&block)
+ return [].each(&block) if empty?
+
+ return to_enum(__method__) unless block_given?
+
+ @offsets_buffer.each(@type.offset_buffer_type,
+ offset_element_size * @offset,
+ @size) do |_, offset|
+ yield(offset)
+ end
+ end
+
def to_a
+ return [] if empty?
+
children_values = @children.collect(&:to_a)
- types = @types_buffer.each(type_buffer_type,
- type_element_size * @offset,
- @size)
- offsets = @offsets_buffer.each(:s32,
- offset_element_size * @offset,
- @size)
- types.zip(offsets).collect do |(_, type), (_, offset)|
+ each_type.zip(each_offset).collect do |type, offset|
index = @type.resolve_type_index(type)
children_values[index][offset]
end
@@ -624,10 +688,10 @@ module ArrowFormat
end
def to_a
+ return [] if empty?
+
children_values = @children.collect(&:to_a)
- @types_buffer.each(type_buffer_type,
- type_element_size * @offset,
- @size).with_index.collect do |(_, type), i|
+ each_type.with_index.collect do |type, i|
index = @type.resolve_type_index(type)
children_values[index][i]
end
@@ -663,15 +727,19 @@ module ArrowFormat
yield(@indices_buffer)
end
+ def indices
+ buffer_type = @type.index_type.buffer_type
+ offset = IO::Buffer.size_of(buffer_type) * @offset
+ apply_validity(@indices_buffer.values(buffer_type, offset, @size))
+ end
+
def to_a
+ return [] if empty?
+
values = []
@dictionaries.each do |dictionary|
values.concat(dictionary.to_a)
end
- buffer_type = @type.index_type.buffer_type
- offset = IO::Buffer.size_of(buffer_type) * @offset
- indices =
- apply_validity(@indices_buffer.values(buffer_type, offset, @size))
indices.collect do |index|
if index.nil?
nil
diff --git a/ruby/red-arrow-format/lib/arrow-format/bitmap.rb
b/ruby/red-arrow-format/lib/arrow-format/bitmap.rb
index e4a0dc76d3..183c3b28f3 100644
--- a/ruby/red-arrow-format/lib/arrow-format/bitmap.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/bitmap.rb
@@ -36,7 +36,7 @@ module ArrowFormat
current = -1
n_bytes = (@offset + @n_values) / 8
@buffer.each(:U8, 0, n_bytes) do |offset, value|
- 7.times do |i|
+ 8.times do |i|
current += 1
next if current < @offset
yield((value & (1 << (i % 8))) > 0)
diff --git a/ruby/red-arrow-format/lib/arrow-format/error.rb
b/ruby/red-arrow-format/lib/arrow-format/error.rb
index d73c4082be..f6aebb3645 100644
--- a/ruby/red-arrow-format/lib/arrow-format/error.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/error.rb
@@ -25,7 +25,7 @@ module ArrowFormat
attr_reader :buffer
def initialize(buffer, message)
@buffer = buffer
- super("#{message}: #{@buffer}")
+ super("#{message}: #{@buffer.inspect}")
end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/field.rb
b/ruby/red-arrow-format/lib/arrow-format/field.rb
index 022091f651..28a4a90b09 100644
--- a/ruby/red-arrow-format/lib/arrow-format/field.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/field.rb
@@ -18,17 +18,14 @@ module ArrowFormat
class Field
attr_reader :name
attr_reader :type
- attr_reader :dictionary_id
attr_reader :metadata
def initialize(name,
type,
nullable: true,
- dictionary_id: nil,
metadata: nil)
@name = name
@type = type
@nullable = nullable
- @dictionary_id = dictionary_id
@metadata = metadata
end
@@ -41,7 +38,7 @@ module ArrowFormat
fb_field.name = @name
fb_field.nullable = @nullable
if @type.respond_to?(:build_fb_field)
- @type.build_fb_field(fb_field, self)
+ @type.build_fb_field(fb_field)
else
fb_field.type = @type.to_flatbuffers
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
index ed510ff09c..7c749e5fbf 100644
--- a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
@@ -64,7 +64,7 @@ module ArrowFormat
"Not a record batch message: #{i}: " +
fb_header.class.name)
end
- read_record_batch(fb_header, @schema, body)
+ read_record_batch(fb_message.version, fb_header, @schema, body)
end
def each
@@ -108,11 +108,14 @@ module ArrowFormat
def read_block(block, type, i)
offset = block.offset
- # If we can report property error information, we can use
+ # If we can report error information correctly, we can use
# MessagePullReader here.
#
# message_pull_reader = MessagePullReader.new do |message, body|
- # return read_record_batch(message.header, @schema, body)
+ # return read_record_batch(message.version,
+ # message.header,
+ # @schema,
+ # body)
# end
# chunk = @buffer.slice(offset,
# MessagePullReader::CONTINUATION_SIZE +
@@ -123,12 +126,18 @@ module ArrowFormat
continuation_size = CONTINUATION_BUFFER.size
continuation = @buffer.slice(offset, continuation_size)
- unless continuation == CONTINUATION_BUFFER
+ if continuation == CONTINUATION_BUFFER
+ offset += continuation_size
+ elsif continuation.get_value(MessagePullReader::CONTINUATION_TYPE, 0) < 0
raise FileReadError.new(@buffer,
"Invalid continuation: #{type}: #{i}: " +
continuation.inspect)
+ else
+ # For backward compatibility of data produced prior to version
+ # 0.15.0. It doesn't have continuation token. Ignore it and
+ # re-read it as metadata length.
+ continuation_size = 0
end
- offset += continuation_size
metadata_length_type = MessagePullReader::METADATA_LENGTH_TYPE
metadata_length_size = MessagePullReader::METADATA_LENGTH_SIZE
@@ -161,7 +170,7 @@ module ArrowFormat
dictionary_fields = {}
@schema.fields.each do |field|
next unless field.type.is_a?(DictionaryType)
- dictionary_fields[field.dictionary_id] = field
+ dictionary_fields[field.type.id] = field
end
dictionaries = {}
@@ -194,7 +203,10 @@ module ArrowFormat
value_type = dictionary_fields[id].type.value_type
schema = Schema.new([Field.new("dummy", value_type)])
- record_batch = read_record_batch(fb_header.data, schema, body)
+ record_batch = read_record_batch(fb_message.version,
+ fb_header.data,
+ schema,
+ body)
if fb_header.delta?
dictionaries[id] << record_batch.columns[0]
else
diff --git a/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb
b/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb
new file mode 100644
index 0000000000..3660e8af34
--- /dev/null
+++ b/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb
@@ -0,0 +1,408 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require "json"
+
+module ArrowFormat
+ module Integration
+ class JSONReader
+ attr_reader :schema
+ def initialize(input)
+ @json = JSON.load(input.read)
+ @dictionaries = {}
+ @schema = read_schema
+ end
+
+ def each
+ @json["batches"].each do |record_batch|
+ yield(read_record_batch(record_batch))
+ end
+ end
+
+ private
+ def read_metadata(metadata)
+ return nil if metadata.nil?
+
+ metadata.inject({}) do |result, metadatum|
+ result[metadatum["key"]] = metadatum["value"]
+ result
+ end
+ end
+
+ def read_type(type, children)
+ case type["name"]
+ when "null"
+ NullType.singleton
+ when "bool"
+ BooleanType.singleton
+ when "int"
+ is_signed = type["isSigned"]
+ case type["bitWidth"]
+ when 8
+ if is_signed
+ Int8Type.singleton
+ else
+ UInt8Type.singleton
+ end
+ when 16
+ if is_signed
+ Int16Type.singleton
+ else
+ UInt16Type.singleton
+ end
+ when 32
+ if is_signed
+ Int32Type.singleton
+ else
+ UInt32Type.singleton
+ end
+ when 64
+ if is_signed
+ Int64Type.singleton
+ else
+ UInt64Type.singleton
+ end
+ else
+ raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+ end
+ when "floatingpoint"
+ case type["precision"]
+ when "SINGLE"
+ Float32Type.singleton
+ when "DOUBLE"
+ Float64Type.singleton
+ else
+ raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+ end
+ when "date"
+ case type["unit"]
+ when "DAY"
+ Date32Type.singleton
+ when "MILLISECOND"
+ Date64Type.singleton
+ else
+ raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+ end
+ when "time"
+ unit = type["unit"].downcase.to_sym
+ case type["bitWidth"]
+ when 32
+ Time32Type.new(unit)
+ when 64
+ Time64Type.new(unit)
+ else
+ raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+ end
+ when "timestamp"
+ unit = type["unit"].downcase.to_sym
+ TimestampType.new(unit, type["timezone"])
+ when "interval"
+ case type["unit"]
+ when "YEAR_MONTH"
+ YearMonthIntervalType.singleton
+ when "DAY_TIME"
+ DayTimeIntervalType.singleton
+ when "MONTH_DAY_NANO"
+ MonthDayNanoIntervalType.singleton
+ else
+ raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+ end
+ when "duration"
+ DurationType.new(type["unit"].downcase.to_sym)
+ when "binary"
+ BinaryType.singleton
+ when "largebinary"
+ LargeBinaryType.singleton
+ when "utf8"
+ UTF8Type.singleton
+ when "largeutf8"
+ LargeUTF8Type.singleton
+ when "fixedsizebinary"
+ FixedSizeBinaryType.new(type["byteWidth"])
+ when "decimal"
+ precision = type["precision"]
+ scale = type["scale"]
+ case type["bitWidth"]
+ when 128
+ Decimal128Type.new(precision, scale)
+ when 256
+ Decimal256Type.new(precision, scale)
+ else
+ raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+ end
+ when "list"
+ ListType.new(read_field(children[0]))
+ when "largelist"
+ LargeListType.new(read_field(children[0]))
+ when "fixedsizelist"
+ FixedSizeListType.new(read_field(children[0]), type["listSize"])
+ when "struct"
+ StructType.new(children.collect {|child| read_field(child)})
+ when "map"
+ MapType.new(read_field(children[0]), type["keysSorted"])
+ when "union"
+ children = children.collect {|child| read_field(child)}
+ type_ids = type["typeIds"]
+ case type["mode"]
+ when "DENSE"
+ DenseUnionType.new(children, type_ids)
+ when "SPARSE"
+ SparseUnionType.new(children, type_ids)
+ else
+ raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+ end
+ else
+ raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+ end
+ end
+
+ def read_field(field)
+ type = read_type(field["type"], field["children"])
+ dictionary = field["dictionary"]
+ if dictionary
+ index_type = read_type(dictionary["indexType"], [])
+ value_type = type
+ type = DictionaryType.new(dictionary["id"],
+ index_type,
+ value_type,
+ dictionary["isOrdered"])
+ end
+ metadata = read_metadata(field["metadata"])
+ Field.new(field["name"],
+ type,
+ nullable: field["nullable"],
+ metadata: metadata)
+ end
+
+ def read_dictionary(id, type)
+ @json["dictionaries"].each do |dictionary|
+ next unless dictionary["id"] == id
+ return read_array(dictionary["data"]["columns"][0], type)
+ end
+ end
+
+ def read_schema
+ fields = []
+ @json["schema"]["fields"].each do |field|
+ fields << read_field(field)
+ end
+ metadata = read_metadata(@json["schema"]["metadata"])
+ Schema.new(fields, metadata: metadata)
+ end
+
+ def read_bitmap(bitmap)
+ buffer = +"".b
+ bitmap.each_slice(8) do |bits|
+ byte = 0
+ while bits.size < 8
+ bits << 0
+ end
+ bits.reverse_each do |bit|
+ byte = (byte << 1) + bit
+ end
+ buffer << [byte].pack("C")
+ end
+ IO::Buffer.for(buffer)
+ end
+
+ def read_types(types)
+ buffer_type = :S8
+ size = IO::Buffer.size_of(buffer_type)
+ buffer = IO::Buffer.new(size * types.size)
+ types.each_with_index do |type, i|
+ offset = size * i
+ buffer.set_value(buffer_type, offset, type)
+ end
+ buffer
+ end
+
+ def read_offsets(offsets, type)
+ return nil if offsets.nil?
+
+ case type
+ when LargeListType, LargeBinaryType, LargeUTF8Type
+ offsets = offsets.collect {|offset| Integer(offset, 10)}
+ end
+ size = IO::Buffer.size_of(type.offset_buffer_type)
+ buffer = IO::Buffer.new(size * offsets.size)
+ offsets.each_with_index do |offset, i|
+ value_offset = size * i
+ buffer.set_value(type.offset_buffer_type, value_offset, offset)
+ end
+ buffer
+ end
+
+ def read_hex_value(value)
+ values = value.scan(/.{2}/).collect do |hex|
+ Integer(hex, 16)
+ end
+ values.pack("C*")
+ end
+
+ def read_values(data, type)
+ case type
+ when BooleanType
+ read_bitmap(data.collect {|boolean| boolean ? 1 : 0})
+ when DayTimeIntervalType
+ buffer_types = [type.buffer_type] * 2
+ size = IO::Buffer.size_of(buffer_types)
+ buffer = IO::Buffer.new(size * data.size)
+ data.each_with_index do |value, i|
+ offset = size * i
+ components = value.fetch_values("days", "milliseconds")
+ buffer.set_values(buffer_types, offset, components)
+ end
+ buffer
+ when MonthDayNanoIntervalType
+ size = IO::Buffer.size_of(type.buffer_types)
+ buffer = IO::Buffer.new(size * data.size)
+ data.each_with_index do |value, i|
+ offset = size * i
+ components = value.fetch_values("months", "days", "nanoseconds")
+ buffer.set_values(type.buffer_types, offset, components)
+ end
+ buffer
+ when NumberType,
+ TemporalType
+ size = IO::Buffer.size_of(type.buffer_type)
+ buffer = IO::Buffer.new(size * data.size)
+ data.each_with_index do |value, i|
+ offset = size * i
+ # If the type is 64bit such as `Int64Type`, `value` is a
+ # string not integer to round-trip data through JSON.
+ value = Integer(value, 10) if value.is_a?(String)
+ buffer.set_value(type.buffer_type, offset, value)
+ end
+ buffer
+ when DecimalType
+ byte_width = type.byte_width
+ bit_width = byte_width * 8
+ buffer = IO::Buffer.new(byte_width * data.size)
+ data.each_with_index do |value, i|
+ offset = byte_width * i
+ components = []
+ value = BigDecimal(value)
+ value *= 10 ** value.scale if value.scale > 0
+ bits = "%0#{bit_width}b" % value.to_i
+ if value.negative?
+ # `bits` starts with "..1".
+ #
+ # If `value` is the minimum negative value, `bits` may
+ # be larger than `bit_width` because of the start `..`
+ # (2 characters).
+ bits = bits.delete_prefix("..").rjust(bit_width, "1")
+ end
+ bits.scan(/[01]{64}/).reverse_each do |chunk|
+ buffer.set_value(:u64, offset, Integer(chunk, 2))
+ offset += 8
+ end
+ end
+ buffer
+ when UTF8Type, LargeUTF8Type
+ IO::Buffer.for(data.join)
+ when VariableSizeBinaryType, FixedSizeBinaryType
+ IO::Buffer.for(data.collect {|value| read_hex_value(value)}.join)
+ else
+ raise "Unsupported values: #{data.inspect}: #{type.inspect}"
+ end
+ end
+
+ def read_array(column, type)
+ length = column["count"]
+ case type
+ when NullType
+ type.build_array(length)
+ when BooleanType,
+ NumberType,
+ TemporalType,
+ FixedSizeBinaryType
+ validity_buffer = read_bitmap(column["VALIDITY"])
+ values_buffer = read_values(column["DATA"], type)
+ type.build_array(length, validity_buffer, values_buffer)
+ when VariableSizeBinaryType
+ validity_buffer = read_bitmap(column["VALIDITY"])
+ offsets_buffer = read_offsets(column["OFFSET"], type)
+ values_buffer = read_values(column["DATA"], type)
+ type.build_array(length,
+ validity_buffer,
+ offsets_buffer,
+ values_buffer)
+ when VariableSizeListType
+ validity_buffer = read_bitmap(column["VALIDITY"])
+ offsets_buffer = read_offsets(column["OFFSET"], type)
+ child = read_array(column["children"][0], type.child.type)
+ type.build_array(length,
+ validity_buffer,
+ offsets_buffer,
+ child)
+ when FixedSizeListType
+ validity_buffer = read_bitmap(column["VALIDITY"])
+ child = read_array(column["children"][0], type.child.type)
+ type.build_array(length, validity_buffer, child)
+ when StructType
+ validity_buffer = read_bitmap(column["VALIDITY"])
+ children = column["children"]
+ .zip(type.children)
+ .collect do |child_column, child_field|
+ read_array(child_column, child_field.type)
+ end
+ type.build_array(length, validity_buffer, children)
+ when DenseUnionType
+ types_buffer = read_types(column["TYPE_ID"])
+ offsets_buffer = read_offsets(column["OFFSET"], type)
+ children = column["children"]
+ .zip(type.children)
+ .collect do |child_column, child_field|
+ read_array(child_column, child_field.type)
+ end
+ type.build_array(length,
+ types_buffer,
+ offsets_buffer,
+ children)
+ when SparseUnionType
+ types_buffer = read_types(column["TYPE_ID"])
+ children = column["children"]
+ .zip(type.children)
+ .collect do |child_column, child_field|
+ read_array(child_column, child_field.type)
+ end
+ type.build_array(length, types_buffer, children)
+ when DictionaryType
+ validity_buffer = read_bitmap(column["VALIDITY"])
+ indices_buffer = read_values(column["DATA"], type.index_type)
+ dictionary = read_dictionary(type.id, type.value_type)
+ type.build_array(length,
+ validity_buffer,
+ indices_buffer,
+ [dictionary])
+ else
+ raise "Unsupported array: #{column.inspect}: #{field.inspect}"
+ end
+ end
+
+ def read_record_batch(record_batch)
+ n_rows = record_batch["count"]
+ columns = record_batch["columns"]
+ .zip(@schema.fields)
+ .collect do |column, field|
+ read_array(column, field.type)
+ end
+ RecordBatch.new(@schema, n_rows, columns)
+ end
+ end
+ end
+end
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
b/ruby/red-arrow-format/lib/arrow-format/integration/options.rb
similarity index 52%
copy from ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
copy to ruby/red-arrow-format/lib/arrow-format/integration/options.rb
index f11972c67a..a12e8a10ca 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/integration/options.rb
@@ -15,35 +15,39 @@
# specific language governing permissions and limitations
# under the License.
-require_relative "streaming-pull-reader"
-
module ArrowFormat
- class StreamingReader
- include Enumerable
-
- attr_reader :schema
- def initialize(input)
- @input = input
- @schema = nil
- end
-
- def each
- return to_enum(__method__) unless block_given?
+ module Integration
+ class Options
+ class << self
+ def singleton
+ @singleton ||= new
+ end
+ end
- reader = StreamingPullReader.new do |record_batch|
- @schema ||= reader.schema
- yield(record_batch)
+ attr_reader :command
+ attr_reader :arrow
+ attr_reader :arrows
+ attr_reader :json
+ def initialize
+ @command = ENV["COMMAND"]
+ @arrow = ENV["ARROW"]
+ @arrows = ENV["ARROWS"]
+ @json = ENV["JSON"]
+ @validate_date64 = ENV["QUIRK_NO_DATE64_VALIDATE"] != "true"
+ @validate_decimal = ENV["QUIRK_NO_DECIMAL_VALIDATE"] != "true"
+ @validate_time = ENV["QUIRK_NO_TIMES_VALIDATE"] != "true"
end
- buffer = "".b
- loop do
- next_size = reader.next_required_size
- break if next_size.zero?
+ def validate_date64?
+ @validate_date64
+ end
- next_chunk = @input.read(next_size, buffer)
- break if next_chunk.nil?
+ def validate_decimal?
+ @validate_decimal
+ end
- reader.consume(IO::Buffer.for(next_chunk))
+ def validate_time?
+ @validate_time
end
end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb
b/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb
new file mode 100644
index 0000000000..b1eec2fc55
--- /dev/null
+++ b/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb
@@ -0,0 +1,656 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require "json"
+require "test-unit"
+
+ENV["TEST_UNIT_MAX_DIFF_TARGET_STRING_SIZE"] ||= "1_000_000"
+
+class ArrowFormatIntegrationTest < Test::Unit::TestCase
+ module ToHashable
+ refine ArrowFormat::Type do
+ def to_h
+ {
+ "name" => normalized_name,
+ }
+ end
+
+ private
+ def normalized_name
+ name.downcase
+ end
+ end
+
+ refine ArrowFormat::BooleanType do
+ private
+ def normalized_name
+ "bool"
+ end
+ end
+
+ refine ArrowFormat::IntType do
+ def to_h
+ super.merge("bitWidth" => @bit_width,
+ "isSigned" => @signed)
+ end
+
+ private
+ def normalized_name
+ "int"
+ end
+ end
+
+ refine ArrowFormat::FloatingPointType do
+ def to_h
+ super.merge("precision" => normalized_precision)
+ end
+
+ private
+ def normalized_name
+ "floatingpoint"
+ end
+
+ def normalized_precision
+ @precision.to_s.upcase
+ end
+ end
+
+ refine ArrowFormat::DateType do
+ def to_h
+ super.merge("unit" => normalized_unit)
+ end
+
+ private
+ def normalized_name
+ "date"
+ end
+
+ def normalized_unit
+ @unit.to_s.upcase
+ end
+ end
+
+ refine ArrowFormat::TimeType do
+ def to_h
+ super.merge("bitWidth" => @bit_width,
+ "unit" => normalized_unit)
+ end
+
+ private
+ def normalized_name
+ "time"
+ end
+
+ def normalized_unit
+ @unit.to_s.upcase
+ end
+ end
+
+ refine ArrowFormat::TimestampType do
+ def to_h
+ hash = super
+ hash["unit"] = normalized_unit
+ hash["timezone"] = @time_zone if @time_zone
+ hash
+ end
+
+ private
+ def normalized_name
+ "timestamp"
+ end
+
+ def normalized_unit
+ @unit.to_s.upcase
+ end
+ end
+
+ refine ArrowFormat::IntervalType do
+ def to_h
+ super.merge("unit" => normalized_unit)
+ end
+
+ private
+ def normalized_name
+ "interval"
+ end
+
+ def normalized_unit
+ @unit.to_s.upcase
+ end
+ end
+
+ refine ArrowFormat::DurationType do
+ def to_h
+ super.merge("unit" => normalized_unit)
+ end
+
+ private
+ def normalized_unit
+ @unit.to_s.upcase
+ end
+ end
+
+ refine ArrowFormat::FixedSizeBinaryType do
+ def to_h
+ super.merge("byteWidth" => @byte_width)
+ end
+ end
+
+ refine ArrowFormat::FixedSizeListType do
+ def to_h
+ super.merge("listSize" => @size)
+ end
+ end
+
+ refine ArrowFormat::DecimalType do
+ def to_h
+ hash = super
+ hash.delete("byteWidth")
+ hash["bitWidth"] = @byte_width * 8
+ hash["precision"] = @precision
+ hash["scale"] = @scale
+ hash
+ end
+
+ private
+ def normalized_name
+ "decimal"
+ end
+ end
+
+ refine ArrowFormat::UnionType do
+ def to_h
+ super.merge("mode" => normalized_mode,
+ "typeIds" => normalized_type_ids)
+ end
+
+ private
+ def normalized_name
+ "union"
+ end
+
+ def normalized_mode
+ @mode.to_s.upcase
+ end
+
+ def normalized_type_ids
+ @type_ids
+ end
+ end
+
+ refine ArrowFormat::MapType do
+ def to_h
+ super.merge("keysSorted" => @keys_sorted)
+ end
+ end
+
+ module MetadataNormalizable
+ private
+ def normalized_metadata
+ metadata_list = @metadata.collect do |key, value|
+ {"key" => key, "value" => value}
+ end
+ metadata_list.sort_by do |metadatum|
+ metadatum["key"]
+ end
+ end
+ end
+
+ refine ArrowFormat::Field do
+ import_methods MetadataNormalizable
+
+ def to_h
+ hash = {
+ "children" => normalized_children,
+ "name" => @name,
+ "nullable" => @nullable,
+ }
+ if @type.is_a?(ArrowFormat::DictionaryType)
+ hash["type"] = @type.value_type.to_h
+ hash["dictionary"] = {
+ "id" => @type.id,
+ "indexType" => @type.index_type.to_h,
+ "isOrdered" => @type.ordered?,
+ }
+ else
+ hash["type"] = @type.to_h
+ end
+ hash["metadata"] = normalized_metadata if @metadata
+ hash
+ end
+
+ private
+ def normalized_children
+ if @type.respond_to?(:children)
+ @type.children.collect(&:to_h)
+ elsif @type.respond_to?(:child)
+ [@type.child.to_h]
+ else
+ []
+ end
+ end
+ end
+
+ refine ArrowFormat::Schema do
+ import_methods MetadataNormalizable
+
+ def to_h
+ hash = {
+ "fields" => @fields.collect(&:to_h),
+ }
+ hash["metadata"] = normalized_metadata if @metadata
+ hash
+ end
+ end
+
+ refine ArrowFormat::Array do
+ def to_h
+ hash = {
+ "count" => size,
+ }
+ to_h_data(hash)
+ hash
+ end
+
+ private
+ def to_h_data(hash)
+ hash["DATA"] = normalized_data
+ hash["VALIDITY"] = normalized_validity
+ end
+
+ def normalized_data
+ to_a
+ end
+
+ def normalized_validity
+ if @validity_buffer
+ validity_bitmap.collect {|valid| valid ? 1 : 0}
+ else
+ [1] * @size
+ end
+ end
+
+ def stringify_data(data)
+ data.collect do |value|
+ if value.nil?
+ nil
+ else
+ value.to_s
+ end
+ end
+ end
+
+ def hexify(value)
+ value.each_byte.collect {|byte| "%02X" % byte}.join("")
+ end
+
+ def normalized_children
+ @children.zip(@type.children).collect do |child, field|
+ normalize_child(child, field)
+ end
+ end
+
+ def normalize_child(child, field)
+ child.to_h.merge("name" => field.name)
+ end
+ end
+
+ refine ArrowFormat::NullArray do
+ private
+ def to_h_data(hash)
+ end
+ end
+
+ refine ArrowFormat::Int64Array do
+ private
+ def normalized_data
+ stringify_data(super)
+ end
+ end
+
+ refine ArrowFormat::UInt64Array do
+ private
+ def normalized_data
+ stringify_data(super)
+ end
+ end
+
+ refine ArrowFormat::Float32Array do
+ private
+ def normalized_data
+ super.collect do |value|
+ if value.nil?
+ nil
+ else
+ value.round(3)
+ end
+ end
+ end
+ end
+
+ refine ArrowFormat::Date64Array do
+ private
+ def normalized_data
+ stringify_data(super)
+ end
+ end
+
+ refine ArrowFormat::Time64Array do
+ private
+ def normalized_data
+ stringify_data(super)
+ end
+ end
+
+ refine ArrowFormat::TimestampArray do
+ private
+ def normalized_data
+ stringify_data(super)
+ end
+ end
+
+ refine ArrowFormat::DayTimeIntervalArray do
+ private
+ def normalized_data
+ super.collect do |value|
+ if value.nil?
+ nil
+ else
+ day, time = value
+ {"days" => day, "milliseconds" => time}
+ end
+ end
+ end
+ end
+
+ refine ArrowFormat::MonthDayNanoIntervalArray do
+ private
+ def normalized_data
+ super.collect do |value|
+ if value.nil?
+ nil
+ else
+ month, day, nano = value
+ {"months" => month, "days" => day, "nanoseconds" => nano}
+ end
+ end
+ end
+ end
+
+ refine ArrowFormat::MonthDayNanoIntervalArray do
+ private
+ def normalized_data
+ super.collect do |value|
+ if value.nil?
+ nil
+ else
+ month, day, nano = value
+ {"months" => month, "days" => day, "nanoseconds" => nano}
+ end
+ end
+ end
+ end
+
+ refine ArrowFormat::DurationArray do
+ private
+ def normalized_data
+ stringify_data(super)
+ end
+ end
+
+ refine ArrowFormat::VariableSizeBinaryArray do
+ private
+ def to_h_data(hash)
+ super(hash)
+ hash["OFFSET"] = normalized_offsets
+ end
+
+ def normalized_data
+ super.collect do |value|
+ if value.nil?
+ nil
+ else
+ normalize_value(value)
+ end
+ end
+ end
+
+ def normalize_value(value)
+ hexify(value)
+ end
+
+ def normalized_offsets
+ offsets
+ end
+ end
+
+ refine ArrowFormat::LargeBinaryArray do
+ private
+ def normalized_offsets
+ offsets.collect(&:to_s)
+ end
+ end
+
+ refine ArrowFormat::VariableSizeUTF8Array do
+ private
+ def normalize_value(value)
+ value
+ end
+ end
+
+ refine ArrowFormat::LargeUTF8Array do
+ private
+ def normalized_offsets
+ offsets.collect(&:to_s)
+ end
+ end
+
+ refine ArrowFormat::FixedSizeBinaryArray do
+ private
+ def normalized_data
+ super.collect do |value|
+ if value.nil?
+ nil
+ else
+ normalize_value(value)
+ end
+ end
+ end
+
+ def normalize_value(value)
+ hexify(value)
+ end
+ end
+
+ refine ArrowFormat::DecimalArray do
+ private
+ def normalize_value(value)
+ (value * (10 ** (@type.scale))).to_s("f").delete_suffix(".0")
+ end
+ end
+
+ refine ArrowFormat::VariableSizeListArray do
+ private
+ def to_h_data(hash)
+ hash["OFFSET"] = normalized_offsets
+ hash["VALIDITY"] = normalized_validity
+ hash["children"] = [normalize_child(@child, @type.child)]
+ end
+
+ def normalized_offsets
+ offsets
+ end
+ end
+
+ refine ArrowFormat::FixedSizeListArray do
+ private
+ def to_h_data(hash)
+ hash["VALIDITY"] = normalized_validity
+ hash["children"] = [normalize_child(@child, @type.child)]
+ end
+ end
+
+ refine ArrowFormat::LargeListArray do
+ private
+ def normalized_offsets
+ offsets.collect(&:to_s)
+ end
+ end
+
+ refine ArrowFormat::StructArray do
+ private
+ def to_h_data(hash)
+ hash["VALIDITY"] = normalized_validity
+ hash["children"] = normalized_children
+ end
+ end
+
+ refine ArrowFormat::UnionArray do
+ private
+ def to_h_data(hash)
+ hash["TYPE_ID"] = each_type.to_a
+ hash["children"] = normalized_children
+ end
+ end
+
+ refine ArrowFormat::DenseUnionArray do
+ private
+ def to_h_data(hash)
+ super
+ hash["OFFSET"] = each_offset.to_a
+ end
+ end
+
+ refine ArrowFormat::DictionaryArray do
+ private
+ def normalized_data
+ indices
+ end
+ end
+
+ refine ArrowFormat::RecordBatch do
+ def to_h
+ {
+ "columns" => normalized_columns,
+ "count" => @n_rows,
+ }
+ end
+
+ private
+ def normalized_columns
+ @schema.fields.zip(@columns).collect do |field, column|
+ column.to_h.merge("name" => field.name)
+ end
+ end
+ end
+ end
+
+ using ToHashable
+
+ def setup
+ @options = ArrowFormat::Integration::Options.singleton
+ end
+
+ def normalize_field!(field)
+ metadata = field["metadata"]
+ if metadata
+ field["metadata"] = metadata.sort_by do |metadatum|
+ metadatum["key"]
+ end
+ end
+
+ case field["type"]["name"]
+ when "decimal"
+ field["type"]["bitWidth"] ||= 128 unless @options.validate_decimal?
+ when "map"
+ entries = field["children"][0]
+ entries["name"] = "entries"
+ entries["children"][0]["name"] = "key"
+ entries["children"][1]["name"] = "value"
+ end
+ end
+
+ def normalize_schema!(schema)
+ schema["fields"].each do |field|
+ normalize_field!(field)
+ end
+ end
+
+ def normalize_array!(array, field)
+ case field["type"]["name"]
+ when "map"
+ entries = array["children"][0]
+ entries["name"] = "entries"
+ entries["children"][0]["name"] = "key"
+ entries["children"][1]["name"] = "value"
+ when "union"
+ # V4 data has VALIDITY.
+ array.delete("VALIDITY")
+ end
+
+ data = array["DATA"]
+ validity = array["VALIDITY"]
+ if data and validity
+ array["DATA"] = data.zip(validity).collect do |value, valid_bit|
+ if (valid_bit == 1)
+ value
+ else
+ nil
+ end
+ end
+ end
+
+ child_arrays = array["children"]
+ if child_arrays
+ child_fields = field["children"]
+ child_arrays.zip(child_fields) do |child_array, child_field|
+ normalize_array!(child_array, child_field)
+ end
+ end
+ end
+
+ def normalize_record_batch!(record_batch, schema)
+ record_batch["columns"].zip(schema["fields"]) do |column, field|
+ normalize_array!(column, field)
+ end
+ end
+
+ def test_validate
+ expected = JSON.parse(File.read(@options.json))
+ expected_schema = expected["schema"]
+ normalize_schema!(expected_schema)
+ File.open(@options.arrow, "rb") do |input|
+ reader = ArrowFormat::FileReader.new(input)
+ expected_record_batches = []
+ actual_record_batches = []
+ reader.each.with_index do |record_batch, i|
+ expected_record_batch = expected["batches"][i]
+ normalize_record_batch!(expected_record_batch, expected_schema)
+ expected_record_batches << expected_record_batch
+ actual_record_batches << record_batch.to_h
+ end
+ assert_equal({
+ schema: expected_schema,
+ record_batches: expected_record_batches,
+ },
+ {
+ schema: reader.schema.to_h,
+ record_batches: actual_record_batches,
+ })
+ end
+ end
+end
diff --git a/ruby/red-arrow-format/lib/arrow-format/readable.rb
b/ruby/red-arrow-format/lib/arrow-format/readable.rb
index c5c1d5d2b3..783b494b68 100644
--- a/ruby/red-arrow-format/lib/arrow-format/readable.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/readable.rb
@@ -42,7 +42,10 @@ module ArrowFormat
metadata: read_custom_metadata(fb_schema.custom_metadata))
end
- def read_field(fb_field)
+ def read_field(fb_field,
+ map_entries: false,
+ map_key: false,
+ map_value: false)
fb_type = fb_field.type
case fb_type
when FB::Null
@@ -57,6 +60,8 @@ module ArrowFormat
type = Float32Type.singleton
when FB::Precision::DOUBLE
type = Float64Type.singleton
+ else
+ raise ReadError.new("Unsupported type: #{fb_type.inspect}")
end
when FB::Date
case fb_type.unit
@@ -64,6 +69,8 @@ module ArrowFormat
type = Date32Type.singleton
when FB::DateUnit::MILLISECOND
type = Date64Type.singleton
+ else
+ raise ReadError.new("Unsupported type: #{fb_type.inspect}")
end
when FB::Time
case fb_type.bit_width
@@ -73,6 +80,8 @@ module ArrowFormat
type = Time32Type.new(:second)
when FB::TimeUnit::MILLISECOND
type = Time32Type.new(:millisecond)
+ else
+ raise ReadError.new("Unsupported type: #{fb_type.inspect}")
end
when 64
case fb_type.unit
@@ -80,6 +89,8 @@ module ArrowFormat
type = Time64Type.new(:microsecond)
when FB::TimeUnit::NANOSECOND
type = Time64Type.new(:nanosecond)
+ else
+ raise ReadError.new("Unsupported type: #{fb_type.inspect}")
end
end
when FB::Timestamp
@@ -93,6 +104,8 @@ module ArrowFormat
type = DayTimeIntervalType.singleton
when FB::IntervalUnit::MONTH_DAY_NANO
type = MonthDayNanoIntervalType.singleton
+ else
+ raise ReadError.new("Unsupported type: #{fb_type.inspect}")
end
when FB::Duration
unit = fb_type.unit.name.downcase.to_sym
@@ -105,7 +118,15 @@ module ArrowFormat
type = FixedSizeListType.new(read_field(fb_field.children[0]),
fb_type.list_size)
when FB::Struct
- children = fb_field.children.collect {|child| read_field(child)}
+ if map_entries
+ fb_children = fb_field.children
+ children = [
+ read_field(fb_children[0], map_key: true),
+ read_field(fb_children[1], map_value: true),
+ ]
+ else
+ children = fb_field.children.collect {|child| read_field(child)}
+ end
type = StructType.new(children)
when FB::Union
children = fb_field.children.collect {|child| read_field(child)}
@@ -115,9 +136,12 @@ module ArrowFormat
type = DenseUnionType.new(children, type_ids)
when FB::UnionMode::SPARSE
type = SparseUnionType.new(children, type_ids)
+ else
+ raise ReadError.new("Unsupported type: #{fb_type.inspect}")
end
when FB::Map
- type = MapType.new(read_field(fb_field.children[0]))
+ type = MapType.new(read_field(fb_field.children[0], map_entries: true),
+ fb_type.keys_sorted?)
when FB::Binary
type = BinaryType.singleton
when FB::LargeBinary
@@ -134,21 +158,42 @@ module ArrowFormat
type = Decimal128Type.new(fb_type.precision, fb_type.scale)
when 256
type = Decimal256Type.new(fb_type.precision, fb_type.scale)
+ else
+ raise ReadError.new("Unsupported type: #{fb_type.inspect}")
end
+ else
+ raise ReadError.new("Unsupported type: #{fb_type.inspect}")
end
dictionary = fb_field.dictionary
if dictionary
dictionary_id = dictionary.id
index_type = read_type_int(dictionary.index_type)
- type = DictionaryType.new(index_type, type, dictionary.ordered?)
+ value_type = type
+ type = DictionaryType.new(dictionary_id,
+ index_type,
+ value_type,
+ dictionary.ordered?)
+ end
+
+ # Map type uses static "entries"/"key"/"value" as field names
+ # instead of field names in FlatBuffers. It's based on the
+ # specification:
+ #
+ # The names of the child fields may be respectively "entries",
+ # "key", and "value", but this is not enforced.
+ if map_entries
+ name = "entries"
+ elsif map_key
+ name = "key"
+ elsif map_value
+ name = "value"
else
- dictionary_id = nil
+ name = fb_field.name
end
- Field.new(fb_field.name,
+ Field.new(name,
type,
nullable: fb_field.nullable?,
- dictionary_id: dictionary_id,
metadata: read_custom_metadata(fb_field.custom_metadata))
end
@@ -181,17 +226,17 @@ module ArrowFormat
end
end
- def read_record_batch(fb_record_batch, schema, body)
+ def read_record_batch(version, fb_record_batch, schema, body)
n_rows = fb_record_batch.length
nodes = fb_record_batch.nodes
buffers = fb_record_batch.buffers
columns = schema.fields.collect do |field|
- read_column(field, nodes, buffers, body)
+ read_column(version, field, nodes, buffers, body)
end
RecordBatch.new(schema, n_rows, columns)
end
- def read_column(field, nodes, buffers, body)
+ def read_column(version, field, nodes, buffers, body)
node = nodes.shift
length = node.length
@@ -209,51 +254,63 @@ module ArrowFormat
NumberType,
TemporalType
values_buffer = buffers.shift
- values = body.slice(values_buffer.offset, values_buffer.length)
+ values = body&.slice(values_buffer.offset, values_buffer.length)
field.type.build_array(length, validity, values)
when VariableSizeBinaryType
offsets_buffer = buffers.shift
values_buffer = buffers.shift
- offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
- values = body.slice(values_buffer.offset, values_buffer.length)
+ offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length)
+ values = body&.slice(values_buffer.offset, values_buffer.length)
field.type.build_array(length, validity, offsets, values)
when FixedSizeBinaryType
values_buffer = buffers.shift
- values = body.slice(values_buffer.offset, values_buffer.length)
+ values = body&.slice(values_buffer.offset, values_buffer.length)
field.type.build_array(length, validity, values)
when VariableSizeListType
offsets_buffer = buffers.shift
- offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
- child = read_column(field.type.child, nodes, buffers, body)
+ offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length)
+ child = read_column(version, field.type.child, nodes, buffers, body)
field.type.build_array(length, validity, offsets, child)
when FixedSizeListType
- child = read_column(field.type.child, nodes, buffers, body)
+ child = read_column(version, field.type.child, nodes, buffers, body)
field.type.build_array(length, validity, child)
when StructType
children = field.type.children.collect do |child|
- read_column(child, nodes, buffers, body)
+ read_column(version, child, nodes, buffers, body)
end
field.type.build_array(length, validity, children)
when DenseUnionType
- # dense union type doesn't have validity.
- types = validity
+ if version == FB::MetadataVersion::V4
+ # Dense union type has validity with V4.
+ types_buffer = buffers.shift
+ types = body&.slice(types_buffer.offset, types_buffer.length)
+ else
+ # Dense union type doesn't have validity.
+ types = validity
+ end
offsets_buffer = buffers.shift
- offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
+ offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length)
children = field.type.children.collect do |child|
- read_column(child, nodes, buffers, body)
+ read_column(version, child, nodes, buffers, body)
end
field.type.build_array(length, types, offsets, children)
when SparseUnionType
- # sparse union type doesn't have validity.
- types = validity
+ if version == FB::MetadataVersion::V4
+ # Sparse union type has validity with V4.
+ types_buffer = buffers.shift
+ types = body&.slice(types_buffer.offset, types_buffer.length)
+ else
+ # Sparse union type doesn't have validity.
+ types = validity
+ end
children = field.type.children.collect do |child|
- read_column(child, nodes, buffers, body)
+ read_column(version, child, nodes, buffers, body)
end
field.type.build_array(length, types, children)
when DictionaryType
indices_buffer = buffers.shift
- indices = body.slice(indices_buffer.offset, indices_buffer.length)
- dictionaries = find_dictionaries(field.dictionary_id)
+ indices = body&.slice(indices_buffer.offset, indices_buffer.length)
+ dictionaries = find_dictionaries(field.type.id)
field.type.build_array(length, validity, indices, dictionaries)
end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
b/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
index a641c87da7..938f02d762 100644
--- a/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
@@ -22,6 +22,8 @@ module ArrowFormat
attr_reader :schema
attr_reader :n_rows
+ alias_method :size, :n_rows
+ alias_method :length, :n_rows
attr_reader :columns
def initialize(schema, n_rows, columns)
@schema = schema
@@ -29,6 +31,10 @@ module ArrowFormat
@columns = columns
end
+ def empty?
+ @n_rows.zero?
+ end
+
def to_h
hash = {}
@schema.fields.zip(@columns) do |field, column|
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
index 5657ca4a1b..13e7ad7243 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
@@ -100,11 +100,23 @@ module ArrowFormat
private
def consume_initial(target)
continuation = target.get_value(CONTINUATION_TYPE, 0)
- unless continuation == CONTINUATION_INT32
+ if continuation == CONTINUATION_INT32
+ @state = :metadata_length
+ elsif continuation < 0
raise ReadError.new("Invalid continuation token: " +
continuation.inspect)
+ else
+ # For backward compatibility of data produced prior to version
+ # 0.15.0. It doesn't have continuation token. Ignore it and
+ # re-read it as metadata length.
+ metadata_length = continuation
+ if metadata_length == 0
+ @state = :eos
+ else
+ @metadata_length = metadata_length
+ @state = :metadata
+ end
end
- @state = :metadata_length
end
def consume_metadata_length(target)
@@ -204,7 +216,7 @@ module ArrowFormat
@dictionary_fields = {}
@schema.fields.each do |field|
next unless field.type.is_a?(DictionaryType)
- @dictionary_fields[field.dictionary_id] = field
+ @dictionary_fields[field.type.id] = field
end
if @dictionaries.size < @dictionary_fields.size
@state = :initial_dictionaries
@@ -223,7 +235,10 @@ module ArrowFormat
field = @dictionary_fields[header.id]
value_type = field.type.value_type
schema = Schema.new([Field.new("dummy", value_type)])
- record_batch = read_record_batch(header.data, schema, body)
+ record_batch = read_record_batch(message.version,
+ header.data,
+ schema,
+ body)
if header.delta?
@dictionaries[header.id] << record_batch.columns[0]
else
@@ -237,7 +252,7 @@ module ArrowFormat
def process_record_batch_message(message, body)
header = message.header
- @on_read.call(read_record_batch(header, @schema, body))
+ @on_read.call(read_record_batch(message.version, header, @schema, body))
end
end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
b/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
index f11972c67a..f81cfe8913 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
@@ -21,29 +21,49 @@ module ArrowFormat
class StreamingReader
include Enumerable
- attr_reader :schema
def initialize(input)
@input = input
- @schema = nil
+ @on_read = nil
+ @pull_reader = StreamingPullReader.new do |record_batch|
+ @on_read.call(record_batch) if @on_read
+ end
+ @buffer = "".b
+ ensure_schema
+ end
+
+ def schema
+ @pull_reader.schema
end
- def each
+ def each(&block)
return to_enum(__method__) unless block_given?
- reader = StreamingPullReader.new do |record_batch|
- @schema ||= reader.schema
- yield(record_batch)
+ @on_read = block
+ begin
+ loop do
+ break unless consume
+ end
+ ensure
+ @on_read = nil
end
+ end
- buffer = "".b
- loop do
- next_size = reader.next_required_size
- break if next_size.zero?
+ private
+ def consume
+ next_size = @pull_reader.next_required_size
+ return false if next_size.zero?
+
+ next_chunk = @input.read(next_size, @buffer)
+ return false if next_chunk.nil?
- next_chunk = @input.read(next_size, buffer)
- break if next_chunk.nil?
+ @pull_reader.consume(IO::Buffer.for(next_chunk))
+ true
+ end
- reader.consume(IO::Buffer.for(next_chunk))
+ def ensure_schema
+ loop do
+ break unless consume
+ break if @pull_reader.schema
end
end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
index 0621dcbb89..18eb2dda3a 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
@@ -40,9 +40,9 @@ module ArrowFormat
def write_record_batch(record_batch)
record_batch.schema.fields.each_with_index do |field, i|
- next if field.dictionary_id.nil?
+ next unless field.type.is_a?(DictionaryType)
dictionary_array = record_batch.columns[i]
- write_dictionary(field.dictionary_id, dictionary_array)
+ write_dictionary(field.type.id, dictionary_array)
end
write_record_batch_based_message(record_batch,
diff --git a/ruby/red-arrow-format/lib/arrow-format/type.rb
b/ruby/red-arrow-format/lib/arrow-format/type.rb
index fb153450b0..17674af30c 100644
--- a/ruby/red-arrow-format/lib/arrow-format/type.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/type.rb
@@ -912,7 +912,7 @@ module ArrowFormat
end
class MapType < VariableSizeListType
- def initialize(child)
+ def initialize(child, keys_sorted)
if child.nullable?
raise TypeError.new("Map entry field must not be nullable: " +
child.inspect)
@@ -930,12 +930,17 @@ module ArrowFormat
type.children[0].inspect)
end
super(child)
+ @keys_sorted = keys_sorted
end
def name
"Map"
end
+ def keys_sorted?
+ @keys_sorted
+ end
+
def offset_buffer_type
:s32 # TODO: big endian support
end
@@ -993,6 +998,10 @@ module ArrowFormat
"DenseUnion"
end
+ def offset_buffer_type
+ :s32
+ end
+
def build_array(size, types_buffer, offsets_buffer, children)
DenseUnionArray.new(self, size, types_buffer, offsets_buffer, children)
end
@@ -1013,10 +1022,12 @@ module ArrowFormat
end
class DictionaryType < Type
+ attr_reader :id
attr_reader :index_type
attr_reader :value_type
- def initialize(index_type, value_type, ordered)
+ def initialize(id, index_type, value_type, ordered)
super()
+ @id = id
@index_type = index_type
@value_type = value_type
@ordered = ordered
@@ -1038,22 +1049,21 @@ module ArrowFormat
dictionaries)
end
- def build_fb_field(fb_field, field)
+ def build_fb_field(fb_field)
fb_dictionary_encoding = FB::DictionaryEncoding::Data.new
- fb_dictionary_encoding.id = field.dictionary_id
+ fb_dictionary_encoding.id = @id
fb_int = FB::Int::Data.new
fb_int.bit_width = @index_type.bit_width
fb_int.signed = @index_type.signed?
fb_dictionary_encoding.index_type = fb_int
fb_dictionary_encoding.ordered = @ordered
- fb_dictionary_encoding.dictionary_kind =
- FB::DictionaryKind::DENSE_ARRAY
+ fb_dictionary_encoding.dictionary_kind = FB::DictionaryKind::DENSE_ARRAY
fb_field.type = @value_type.to_flatbuffers
fb_field.dictionary = fb_dictionary_encoding
end
def to_s
- "#{super}<index=#{@index_type}, value=#{@value_type}, " +
+ "#{super}<id=#{@id}, index=#{@index_type}, value=#{@value_type}, " +
"ordered=#{@ordered}>"
end
end
diff --git a/ruby/red-arrow-format/red-arrow-format.gemspec
b/ruby/red-arrow-format/red-arrow-format.gemspec
index 52340b9748..9148a21f50 100644
--- a/ruby/red-arrow-format/red-arrow-format.gemspec
+++ b/ruby/red-arrow-format/red-arrow-format.gemspec
@@ -44,9 +44,10 @@ Gem::Specification.new do |spec|
spec.files = ["README.md", "Rakefile", "Gemfile", "#{spec.name}.gemspec"]
spec.files += ["LICENSE.txt", "NOTICE.txt"]
spec.files += Dir.glob("lib/**/*.rb")
+ spec.files -= Dir.glob("lib/arrow-format/integration/**/*.rb")
spec.files += Dir.glob("doc/text/*")
- spec.add_runtime_dependency("red-flatbuffers", ">=0.0.6")
+ spec.add_runtime_dependency("red-flatbuffers", ">=0.0.8")
github_url = "https://github.com/apache/arrow"
spec.metadata = {
diff --git a/ruby/red-arrow-format/test/test-reader.rb
b/ruby/red-arrow-format/test/test-reader.rb
index 06360f62ac..d59a93ce18 100644
--- a/ruby/red-arrow-format/test/test-reader.rb
+++ b/ruby/red-arrow-format/test/test-reader.rb
@@ -670,7 +670,7 @@ module ReaderTests
array = string_array.dictionary_encode
type, values = roundtrip(array)
assert_equal([
- "Dictionary<index=Int32, value=UTF8, ordered=false>",
+ "Dictionary<id=0, index=Int32, value=UTF8, ordered=false>",
["a", "b", "c", nil, "a"],
],
[type.to_s, values])
diff --git a/ruby/red-arrow-format/test/test-writer.rb
b/ruby/red-arrow-format/test/test-writer.rb
index f36a1a252e..72776f01ab 100644
--- a/ruby/red-arrow-format/test/test-writer.rb
+++ b/ruby/red-arrow-format/test/test-writer.rb
@@ -86,7 +86,8 @@ module WriterHelper
when Arrow::FixedSizeBinaryDataType
ArrowFormat::FixedSizeBinaryType.new(red_arrow_type.byte_width)
when Arrow::MapDataType
- ArrowFormat::MapType.new(convert_field(red_arrow_type.field))
+ ArrowFormat::MapType.new(convert_field(red_arrow_type.field),
+ red_arrow_type.keys_sorted?)
when Arrow::ListDataType
ArrowFormat::ListType.new(convert_field(red_arrow_type.field))
when Arrow::LargeListDataType
@@ -110,10 +111,14 @@ module WriterHelper
end
ArrowFormat::SparseUnionType.new(fields, red_arrow_type.type_codes)
when Arrow::DictionaryDataType
+ @dictionary_id ||= 0
+ dictionary_id = @dictionary_id
+ @dictionary_id += 1
index_type = convert_type(red_arrow_type.index_data_type)
- type = convert_type(red_arrow_type.value_data_type)
- ArrowFormat::DictionaryType.new(index_type,
- type,
+ value_type = convert_type(red_arrow_type.value_data_type)
+ ArrowFormat::DictionaryType.new(dictionary_id,
+ index_type,
+ value_type,
red_arrow_type.ordered?)
else
raise "Unsupported type: #{red_arrow_type.inspect}"
@@ -122,17 +127,9 @@ module WriterHelper
def convert_field(red_arrow_field)
type = convert_type(red_arrow_field.data_type)
- if type.is_a?(ArrowFormat::DictionaryType)
- @dictionary_id ||= 0
- dictionary_id = @dictionary_id
- @dictionary_id += 1
- else
- dictionary_id = nil
- end
ArrowFormat::Field.new(red_arrow_field.name,
type,
nullable: red_arrow_field.nullable?,
- dictionary_id: dictionary_id,
metadata: red_arrow_field.metadata)
end
@@ -930,13 +927,13 @@ end
module WriterDictionaryDeltaTests
def build_schema(value_type)
index_type = ArrowFormat::Int32Type.singleton
+ dictionary_id = 1
ordered = false
- type = ArrowFormat::DictionaryType.new(index_type,
+ type = ArrowFormat::DictionaryType.new(dictionary_id,
+ index_type,
value_type,
ordered)
- field = ArrowFormat::Field.new("value",
- type,
- dictionary_id: 1)
+ field = ArrowFormat::Field.new("value", type)
ArrowFormat::Schema.new([field])
end
diff --git a/ruby/red-arrow/lib/arrow/table-formatter.rb
b/ruby/red-arrow/lib/arrow/table-formatter.rb
index b93faf09cb..b4e257413c 100644
--- a/ruby/red-arrow/lib/arrow/table-formatter.rb
+++ b/ruby/red-arrow/lib/arrow/table-formatter.rb
@@ -80,7 +80,11 @@ module Arrow
when nil
"%*s" % [width, FORMATTED_NULL]
else
- "%-*s" % [width, value.to_s]
+ value = value.to_s
+ if value.encoding == Encoding::ASCII_8BIT
+ value = value.each_byte.collect {|byte| "%X" % byte}.join
+ end
+ "%-*s" % [width, value]
end
end