This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ed37b14f40 GH-49422: [CI][Integration][Ruby] Add the Ruby 
implementation (#49423)
ed37b14f40 is described below

commit ed37b14f40e2dc62da067db20ede9061d400ecd1
Author: Sutou Kouhei <[email protected]>
AuthorDate: Tue Mar 10 13:14:11 2026 +0900

    GH-49422: [CI][Integration][Ruby] Add the Ruby implementation (#49423)
    
    ### Rationale for this change
    
    There are some missing features in the Ruby implmentaion for now but we can 
pass them by skipping some tests in our integration tests.
    
    ### What changes are included in this PR?
    
    Archery:
    
    * Add `--with-ruby` to `archery integration`
    * Add `archery.integration.tester_ruby.RubyTester`
    * Add `no_map_field_names_validate` quirk for GH-49415
    * Show environment variables too on external command failure because Ruby 
tester uses environment variables not command line arguments to pass 
information to integration tester
    * Use `ARCHERY_INTEGRATION_WITH_CPP=1` instead of 
`ARROW_INTEGRATION_CPP=ON` like other implementations such as 
`ARCHERY_INTEGRATION_WITH_GO`
    
    Ruby:
    
    * Add `red-arrow-format-integration-test` as the test driver
      * This is not included in `.gem` because this is only for development
    * Add `ruby/red-arrow-format/lib/arrow-format/integration/` as helpers of 
the test driver
      * This is not included in `.gem` because this is only for development
    * Add `ArrowFormat::Array#empty?`
    * Add `ArrowFormat::RecordBatch#empty?`
    * Add `ArrowFormat::NullArray#n_nulls`
    * `ArrowFormat::*Array#to_a`: Add support for empty case
    * Fix Apache Arrow decimal <-> `BigDecimal` conversion
    * `ArrowFormat::Bitmap#each`: Fix a bug that one bit is ignored
    * Move dictionary ID to `ArrowFormat::DictionaryType` from 
`ArrowFormat::Field`
    * Add support for V4 union that has validity bitmap
    * Add support for no continuation token message for backward compatibility
    * `ArrowFormat::StreamingReader`: Add support for reading schema without 
calling `#each`
    * `ArrowFormat::MapType`: Add support for keys sorted
    * `ArrowFormat::MapType`: Always use "key"/"value"/"entries" for field names
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    * GitHub Issue: #49422
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 .github/workflows/integration.yml                  |   2 +
 ci/docker/conda-integration.dockerfile             |   1 +
 ci/scripts/integration_arrow.sh                    |  10 +-
 ci/scripts/integration_arrow_build.sh              |  11 +-
 dev/archery/archery/cli.py                         |   6 +-
 dev/archery/archery/integration/datagen.py         |  11 +-
 dev/archery/archery/integration/runner.py          |  12 +-
 dev/archery/archery/integration/tester_ruby.py     |  78 +++
 dev/archery/archery/integration/util.py            |   9 +-
 .../bin/red-arrow-format-integration-test          |  64 ++
 ruby/red-arrow-format/lib/arrow-format/array.rb    | 126 +++-
 ruby/red-arrow-format/lib/arrow-format/bitmap.rb   |   2 +-
 ruby/red-arrow-format/lib/arrow-format/error.rb    |   2 +-
 ruby/red-arrow-format/lib/arrow-format/field.rb    |   5 +-
 .../lib/arrow-format/file-reader.rb                |  26 +-
 .../lib/arrow-format/integration/json-reader.rb    | 408 +++++++++++++
 .../options.rb}                                    |  50 +-
 .../lib/arrow-format/integration/validate.rb       | 656 +++++++++++++++++++++
 ruby/red-arrow-format/lib/arrow-format/readable.rb | 111 +++-
 .../lib/arrow-format/record-batch.rb               |   6 +
 .../lib/arrow-format/streaming-pull-reader.rb      |  25 +-
 .../lib/arrow-format/streaming-reader.rb           |  46 +-
 .../lib/arrow-format/streaming-writer.rb           |   4 +-
 ruby/red-arrow-format/lib/arrow-format/type.rb     |  24 +-
 ruby/red-arrow-format/red-arrow-format.gemspec     |   3 +-
 ruby/red-arrow-format/test/test-reader.rb          |   2 +-
 ruby/red-arrow-format/test/test-writer.rb          |  29 +-
 ruby/red-arrow/lib/arrow/table-formatter.rb        |   6 +-
 28 files changed, 1585 insertions(+), 150 deletions(-)

diff --git a/.github/workflows/integration.yml 
b/.github/workflows/integration.yml
index 2ef7f1ca78..583a7009ad 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -33,6 +33,7 @@ on:
       - 'integration/**'
       - 'cpp/**'
       - 'format/**'
+      - 'ruby/red-arrow-format/**'
   pull_request:
     paths:
       - '.dockerignore'
@@ -43,6 +44,7 @@ on:
       - 'integration/**'
       - 'cpp/**'
       - 'format/**'
+      - 'ruby/red-arrow-format/**'
 
 concurrency:
   group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ 
github.workflow }}
diff --git a/ci/docker/conda-integration.dockerfile 
b/ci/docker/conda-integration.dockerfile
index b0e5ec966d..ac7ca57156 100644
--- a/ci/docker/conda-integration.dockerfile
+++ b/ci/docker/conda-integration.dockerfile
@@ -42,6 +42,7 @@ RUN mamba install -q -y \
         nodejs=${node} \
         yarn=${yarn} \
         openjdk=${jdk} \
+        ruby \
         zstd && \
     mamba clean --yes --all --force-pkgs-dirs
 
diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh
index 2ee047c50e..7e315a60a6 100755
--- a/ci/scripts/integration_arrow.sh
+++ b/ci/scripts/integration_arrow.sh
@@ -24,9 +24,14 @@ build_dir=${2}
 
 gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration
 
+# For backward compatibility.
 : "${ARROW_INTEGRATION_CPP:=ON}"
+: "${ARCHERY_INTEGRATION_WITH_CPP:=$([ "${ARROW_INTEGRATION_CPP}" = "ON" ] && 
echo "1" || echo "0")}"
+export ARCHERY_INTEGRATION_WITH_CPP
+: "${ARCHERY_INTEGRATION_WITH_RUBY:=1}"
+export ARCHERY_INTEGRATION_WITH_RUBY
 
-: "${ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS:=cpp}"
+: "${ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS:=cpp,ruby}"
 export ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS
 
 . "${arrow_dir}/ci/scripts/util_log.sh"
@@ -57,14 +62,11 @@ export PYTHONFAULTHANDLER=1
 export GOMEMLIMIT=200MiB
 export GODEBUG=gctrace=1,clobberfree=1
 
-ARCHERY_WITH_CPP=$([ "$ARROW_INTEGRATION_CPP" == "ON" ] && echo "1" || echo 
"0")
-
 # Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
 time archery integration \
     --run-c-data \
     --run-ipc \
     --run-flight \
-    --with-cpp="${ARCHERY_WITH_CPP}" \
     --gold-dirs="$gold_dir/0.14.1" \
     --gold-dirs="$gold_dir/0.17.1" \
     --gold-dirs="$gold_dir/1.0.0-bigendian" \
diff --git a/ci/scripts/integration_arrow_build.sh 
b/ci/scripts/integration_arrow_build.sh
index 61ad0ea59e..691faf7c2d 100755
--- a/ci/scripts/integration_arrow_build.sh
+++ b/ci/scripts/integration_arrow_build.sh
@@ -22,7 +22,10 @@ set -e
 arrow_dir=${1}
 build_dir=${2}
 
+# For backward compatibility.
 : "${ARROW_INTEGRATION_CPP:=ON}"
+: "${ARCHERY_INTEGRATION_WITH_CPP:=$([ "${ARROW_INTEGRATION_CPP}" = "ON" ] && 
echo "1" || echo "0")}"
+: "${ARCHERY_INTEGRATION_WITH_RUBY:=1}"
 
 . "${arrow_dir}/ci/scripts/util_log.sh"
 
@@ -41,7 +44,7 @@ fi
 github_actions_group_end
 
 github_actions_group_begin "Integration: Build: C++"
-if [ "${ARROW_INTEGRATION_CPP}" == "ON" ]; then
+if [ "${ARCHERY_INTEGRATION_WITH_CPP}" -gt "0" ]; then
     "${arrow_dir}/ci/scripts/cpp_build.sh" "${arrow_dir}" "${build_dir}"
 fi
 github_actions_group_end
@@ -69,3 +72,9 @@ if [ "${ARCHERY_INTEGRATION_WITH_JS}" -gt "0" ]; then
     cp -a "${arrow_dir}/js" "${build_dir}/js"
 fi
 github_actions_group_end
+
+github_actions_group_begin "Integration: Build: Ruby"
+if [ "${ARCHERY_INTEGRATION_WITH_RUBY}" -gt "0" ]; then
+    rake -C "${arrow_dir}/ruby/red-arrow-format" install
+fi
+github_actions_group_end
diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py
index f08656fa94..e70cc3874f 100644
--- a/dev/archery/archery/cli.py
+++ b/dev/archery/archery/cli.py
@@ -667,7 +667,8 @@ def _set_default(opt, default):
 @click.option('--random-seed', type=int, default=12345,
               help="Seed for PRNG when generating test data")
 @click.option('--with-cpp', type=bool, default=False,
-              help='Include C++ in integration tests')
+              help='Include C++ in integration tests',
+              envvar="ARCHERY_INTEGRATION_WITH_CPP")
 @click.option('--with-dotnet', type=bool, default=False,
               help='Include .NET in integration tests',
               envvar="ARCHERY_INTEGRATION_WITH_DOTNET")
@@ -683,6 +684,9 @@ def _set_default(opt, default):
 @click.option('--with-nanoarrow', type=bool, default=False,
               help='Include nanoarrow in integration tests',
               envvar="ARCHERY_INTEGRATION_WITH_NANOARROW")
[email protected]('--with-ruby', type=bool, default=False,
+              help='Include Ruby in integration tests',
+              envvar="ARCHERY_INTEGRATION_WITH_RUBY")
 @click.option('--with-rust', type=bool, default=False,
               help='Include Rust in integration tests',
               envvar="ARCHERY_INTEGRATION_WITH_RUST")
diff --git a/dev/archery/archery/integration/datagen.py 
b/dev/archery/archery/integration/datagen.py
index 83913dc379..6b3b13f51d 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1937,6 +1937,7 @@ def get_generated_json_files(tempdir=None):
         .skip_tester('Java')
         .skip_tester('JS')
         .skip_tester('nanoarrow')
+        .skip_tester('Ruby')
         .skip_tester('Rust')
         .skip_tester('Go'),
 
@@ -1944,6 +1945,7 @@ def get_generated_json_files(tempdir=None):
         .skip_tester('Java')
         .skip_tester('JS')
         .skip_tester('nanoarrow')
+        .skip_tester('Ruby')
         .skip_tester('Rust')
         .skip_tester('Go'),
 
@@ -1993,19 +1995,22 @@ def get_generated_json_files(tempdir=None):
         .skip_tester('nanoarrow')
         .skip_tester('Java')  # TODO(ARROW-7779)
         # TODO(https://github.com/apache/arrow/issues/38045)
-        .skip_format(SKIP_FLIGHT, '.NET'),
+        .skip_format(SKIP_FLIGHT, '.NET')
+        .skip_tester('Ruby'),
 
         generate_run_end_encoded_case()
         .skip_tester('.NET')
         .skip_tester('JS')
         # TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
         .skip_tester('nanoarrow')
+        .skip_tester('Ruby')
         .skip_tester('Rust'),
 
         generate_binary_view_case()
         .skip_tester('JS')
         # TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
         .skip_tester('nanoarrow')
+        .skip_tester('Ruby')
         .skip_tester('Rust'),
 
         generate_list_view_case()
@@ -2013,12 +2018,14 @@ def get_generated_json_files(tempdir=None):
         .skip_tester('JS')
         # TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
         .skip_tester('nanoarrow')
+        .skip_tester('Ruby')
         .skip_tester('Rust'),
 
         generate_extension_case()
         .skip_tester('nanoarrow')
         # TODO(https://github.com/apache/arrow/issues/38045)
-        .skip_format(SKIP_FLIGHT, '.NET'),
+        .skip_format(SKIP_FLIGHT, '.NET')
+        .skip_tester('Ruby'),
     ]
 
     generated_paths = []
diff --git a/dev/archery/archery/integration/runner.py 
b/dev/archery/archery/integration/runner.py
index 29e3364224..9c0fda371e 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -196,9 +196,11 @@ class IntegrationRunner(object):
                 skip_testers.add(".NET")
                 skip_testers.add("Java")
                 skip_testers.add("JS")
+                skip_testers.add("Ruby")
                 skip_testers.add("Rust")
             if prefix == '2.0.0-compression':
                 skip_testers.add("JS")
+                skip_testers.add("Ruby")
             if prefix == '2.0.0-compression' and 'lz4' in name:
                 # https://github.com/apache/arrow-nanoarrow/issues/621
                 skip_testers.add("nanoarrow")
@@ -590,9 +592,9 @@ def get_static_json_files():
 
 
 def select_testers(with_cpp=True, with_java=True, with_js=True,
-                   with_dotnet=True, with_go=True, with_rust=False,
-                   with_nanoarrow=False, target_implementations="",
-                   **kwargs):
+                   with_dotnet=True, with_go=True, with_ruby=False,
+                   with_rust=False, with_nanoarrow=False,
+                   target_implementations="", **kwargs):
     target_implementations = (target_implementations.split(",")
                               if target_implementations else [])
 
@@ -629,6 +631,10 @@ def select_testers(with_cpp=True, with_java=True, 
with_js=True,
         from .tester_nanoarrow import NanoarrowTester
         append_tester("nanoarrow", NanoarrowTester(**kwargs))
 
+    if with_ruby:
+        from .tester_ruby import RubyTester
+        append_tester("ruby", RubyTester(**kwargs))
+
     if with_rust:
         from .tester_rust import RustTester
         append_tester("rust", RustTester(**kwargs))
diff --git a/dev/archery/archery/integration/tester_ruby.py 
b/dev/archery/archery/integration/tester_ruby.py
new file mode 100644
index 0000000000..a84ba825cb
--- /dev/null
+++ b/dev/archery/archery/integration/tester_ruby.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from .tester import Tester
+from .util import run_cmd, log
+from ..utils.source import ARROW_ROOT_DEFAULT
+
+
+_EXE_PATH = os.path.join(
+    ARROW_ROOT_DEFAULT, 
"ruby/red-arrow-format/bin/red-arrow-format-integration-test")
+
+
+class RubyTester(Tester):
+    PRODUCER = True
+    CONSUMER = True
+
+    name = "Ruby"
+
+    def _run(self, env):
+        command_line = [_EXE_PATH]
+        if self.debug:
+            command_line_string = ""
+            for key, value in env.items:
+                command_line_string += f"{key}={value} "
+            command_line_string += " ".join(command_line)
+            log(command_line_string)
+        run_cmd(command_line, env=os.environ | env)
+
+    def validate(self, json_path, arrow_path, quirks=None):
+        env = {
+            "ARROW": arrow_path,
+            "COMMAND": "validate",
+            "JSON": json_path,
+        }
+        if quirks:
+            for quirk in quirks:
+                env[f"QUIRK_{quirk.upper()}"] = "true"
+        self._run(env)
+
+    def json_to_file(self, json_path, arrow_path):
+        env = {
+            "ARROW": arrow_path,
+            "COMMAND": "json-to-file",
+            "JSON": json_path,
+        }
+        self._run(env)
+
+    def stream_to_file(self, stream_path, file_path):
+        env = {
+            "ARROW": file_path,
+            "ARROWS": stream_path,
+            "COMMAND": "stream-to-file",
+        }
+        self._run(env)
+
+    def file_to_stream(self, file_path, stream_path):
+        env = {
+            "ARROW": file_path,
+            "ARROWS": stream_path,
+            "COMMAND": "file-to-stream",
+        }
+        self._run(env)
diff --git a/dev/archery/archery/integration/util.py 
b/dev/archery/archery/integration/util.py
index 1b1eb95a1d..f2fb5ede20 100644
--- a/dev/archery/archery/integration/util.py
+++ b/dev/archery/archery/integration/util.py
@@ -17,6 +17,7 @@
 
 import contextlib
 import io
+import os
 import random
 import socket
 import subprocess
@@ -137,7 +138,13 @@ def run_cmd(cmd, **kwargs):
     except subprocess.CalledProcessError as e:
         # this avoids hiding the stdout / stderr of failed processes
         sio = io.StringIO()
-        print('Command failed:', " ".join(cmd), file=sio)
+        command_line_string = ''
+        env = kwargs.get('env', {})
+        for key in env.keys() - os.environ.keys():
+            value = env[key]
+            command_line_string += f'{key}={value} '
+        command_line_string += ' '.join(cmd)
+        print(f'Command failed: {command_line_string}', file=sio)
         print('With output:', file=sio)
         print('--------------', file=sio)
         print(frombytes(e.output), file=sio)
diff --git a/ruby/red-arrow-format/bin/red-arrow-format-integration-test 
b/ruby/red-arrow-format/bin/red-arrow-format-integration-test
new file mode 100755
index 0000000000..8571621f3d
--- /dev/null
+++ b/ruby/red-arrow-format/bin/red-arrow-format-integration-test
@@ -0,0 +1,64 @@
+#!/usr/bin/env ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require_relative "../lib/arrow-format"
+require_relative "../lib/arrow-format/integration/options"
+
+options = ArrowFormat::Integration::Options.singleton
+case options.command
+when "validate"
+  require_relative "../lib/arrow-format/integration/validate"
+when "json-to-file"
+  require_relative "../lib/arrow-format/integration/json-reader"
+  File.open(options.json, "r") do |input|
+    reader = ArrowFormat::Integration::JSONReader.new(input)
+    File.open(options.arrow, "wb") do |output|
+      writer = ArrowFormat::FileWriter.new(output)
+      writer.start(reader.schema)
+      reader.each do |record_batch|
+        writer.write_record_batch(record_batch)
+      end
+      writer.finish
+    end
+  end
+when "stream-to-file"
+  File.open(options.arrows, "rb") do |input|
+    reader = ArrowFormat::StreamingReader.new(input)
+    File.open(options.arrow, "wb") do |output|
+      writer = ArrowFormat::FileWriter.new(output)
+      writer.start(reader.schema)
+      reader.each do |record_batch|
+        writer.write_record_batch(record_batch)
+      end
+      writer.finish
+    end
+  end
+when "file-to-stream"
+  File.open(options.arrow, "rb") do |input|
+    reader = ArrowFormat::FileReader.new(input)
+    File.open(options.arrows, "wb") do |output|
+      writer = ArrowFormat::StreamingWriter.new(output)
+      writer.start(reader.schema)
+      reader.each do |record_batch|
+        writer.write_record_batch(record_batch)
+      end
+      writer.finish
+    end
+  end
+end
diff --git a/ruby/red-arrow-format/lib/arrow-format/array.rb 
b/ruby/red-arrow-format/lib/arrow-format/array.rb
index 951de74475..10be36d530 100644
--- a/ruby/red-arrow-format/lib/arrow-format/array.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/array.rb
@@ -56,6 +56,10 @@ module ArrowFormat
       end
     end
 
+    def empty?
+      @size.zero?
+    end
+
     protected
     def slice!(offset, size)
       @offset = offset
@@ -144,6 +148,10 @@ module ArrowFormat
       return to_enum(__method__) unless block_given?
     end
 
+    def n_nulls
+      @size
+    end
+
     def to_a
       [nil] * @size
     end
@@ -156,6 +164,8 @@ module ArrowFormat
     end
 
     def to_a
+      return [] if empty?
+
       offset = element_size * @offset
       apply_validity(@values_buffer.values(@type.buffer_type, offset, @size))
     end
@@ -177,6 +187,8 @@ module ArrowFormat
 
   class BooleanArray < PrimitiveArray
     def to_a
+      return [] if empty?
+
       @values_bitmap ||= Bitmap.new(@values_buffer, @offset, @size)
       values = @values_bitmap.to_a
       apply_validity(values)
@@ -264,6 +276,8 @@ module ArrowFormat
 
   class DayTimeIntervalArray < IntervalArray
     def to_a
+      return [] if empty?
+
       offset = element_size * @offset
       values = @values_buffer.
                  each(@type.buffer_type, offset, @size * 2).
@@ -282,6 +296,8 @@ module ArrowFormat
 
   class MonthDayNanoIntervalArray < IntervalArray
     def to_a
+      return [] if empty?
+
       buffer_types = @type.buffer_types
       value_size = IO::Buffer.size_of(buffer_types)
       base_offset = value_size * @offset
@@ -301,7 +317,7 @@ module ArrowFormat
   class DurationArray < TemporalArray
   end
 
-  class VariableSizeBinaryLayoutArray < Array
+  class VariableSizeBinaryArray < Array
     def initialize(type, size, validity_buffer, offsets_buffer, values_buffer)
       super(type, size, validity_buffer)
       @offsets_buffer = offsets_buffer
@@ -323,11 +339,18 @@ module ArrowFormat
       yield(sliced_values_buffer)
     end
 
-    def to_a
-      values = @offsets_buffer.
+    def offsets
+      return [0] if empty?
+
+      @offsets_buffer.
         each(@type.offset_buffer_type, offset_size * @offset, @size + 1).
-        each_cons(2).
-        collect do |(_, offset), (_, next_offset)|
+        collect {|_, offset| offset}
+    end
+
+    def to_a
+      return [] if empty?
+
+      values = offsets.each_cons(2).collect do |offset, next_offset|
         length = next_offset - offset
         @values_buffer.get_string(offset, length, @type.encoding)
       end
@@ -340,16 +363,19 @@ module ArrowFormat
     end
   end
 
-  class BinaryArray < VariableSizeBinaryLayoutArray
+  class BinaryArray < VariableSizeBinaryArray
+  end
+
+  class LargeBinaryArray < VariableSizeBinaryArray
   end
 
-  class LargeBinaryArray < VariableSizeBinaryLayoutArray
+  class VariableSizeUTF8Array < VariableSizeBinaryArray
   end
 
-  class UTF8Array < VariableSizeBinaryLayoutArray
+  class UTF8Array < VariableSizeUTF8Array
   end
 
-  class LargeUTF8Array < VariableSizeBinaryLayoutArray
+  class LargeUTF8Array < VariableSizeUTF8Array
   end
 
   class FixedSizeBinaryArray < Array
@@ -368,6 +394,8 @@ module ArrowFormat
     end
 
     def to_a
+      return [] if empty?
+
       byte_width = @type.byte_width
       values = 0.step(@size * byte_width - 1, byte_width).collect do |offset|
         @values_buffer.get_string(offset, byte_width)
@@ -378,6 +406,8 @@ module ArrowFormat
 
   class DecimalArray < FixedSizeBinaryArray
     def to_a
+      return [] if empty?
+
       byte_width = @type.byte_width
       buffer_types = [:u64] * (byte_width / 8 - 1) + [:s64]
       base_offset = byte_width * @offset
@@ -408,8 +438,8 @@ module ArrowFormat
       elsif @type.scale > 0
         n_digits = string.bytesize
         n_digits -= 1 if value < 0
-        if n_digits < @type.scale
-          prefix = "0." + ("0" * (@type.scale - n_digits - 1))
+        if n_digits <= @type.scale
+          prefix = "0." + ("0" * (@type.scale - n_digits))
           if value < 0
             string[1, 0] = prefix
           else
@@ -446,12 +476,19 @@ module ArrowFormat
                                  @type.offset_buffer_type))
     end
 
+    def offsets
+      return [0] if empty?
+
+      @offsets_buffer.
+        each(@type.offset_buffer_type, offset_size * @offset, @size + 1).
+        collect {|_, offset| offset}
+    end
+
     def to_a
+      return [] if empty?
+
       child_values = @child.to_a
-      values = @offsets_buffer.
-        each(@type.offset_buffer_type, offset_size * @offset, @size + 1).
-        each_cons(2).
-        collect do |(_, offset), (_, next_offset)|
+      values = offsets.each_cons(2).collect do |offset, next_offset|
         child_values[offset...next_offset]
       end
       apply_validity(values)
@@ -494,6 +531,8 @@ module ArrowFormat
     end
 
     def to_a
+      return [] if empty?
+
       values = @child.to_a.each_slice(@type.size).to_a
       apply_validity(values)
     end
@@ -520,6 +559,8 @@ module ArrowFormat
     end
 
     def to_a
+      return [] if empty?
+
       if @children.empty?
         values = [[]] * @size
       else
@@ -540,6 +581,8 @@ module ArrowFormat
 
   class MapArray < VariableSizeListArray
     def to_a
+      return [] if empty?
+
       super.collect do |entries|
         if entries.nil?
           entries
@@ -555,6 +598,7 @@ module ArrowFormat
   end
 
   class UnionArray < Array
+    attr_reader :types_buffer
     attr_reader :children
     def initialize(type, size, types_buffer, children)
       super(type, size, nil)
@@ -562,6 +606,18 @@ module ArrowFormat
       @children = children
     end
 
+    def each_type(&block)
+      return [].each(&block) if empty?
+
+      return to_enum(__method__) unless block_given?
+
+      @types_buffer.each(type_buffer_type,
+                         type_element_size * @offset,
+                         @size) do |_, type|
+        yield(type)
+      end
+    end
+
     private
     def type_buffer_type
       :S8
@@ -590,15 +646,23 @@ module ArrowFormat
       yield(@offsets_buffer)
     end
 
+    def each_offset(&block)
+      return [].each(&block) if empty?
+
+      return to_enum(__method__) unless block_given?
+
+      @offsets_buffer.each(@type.offset_buffer_type,
+                           offset_element_size * @offset,
+                           @size) do |_, offset|
+        yield(offset)
+      end
+    end
+
     def to_a
+      return [] if empty?
+
       children_values = @children.collect(&:to_a)
-      types = @types_buffer.each(type_buffer_type,
-                                 type_element_size * @offset,
-                                 @size)
-      offsets = @offsets_buffer.each(:s32,
-                                     offset_element_size * @offset,
-                                     @size)
-      types.zip(offsets).collect do |(_, type), (_, offset)|
+      each_type.zip(each_offset).collect do |type, offset|
         index = @type.resolve_type_index(type)
         children_values[index][offset]
       end
@@ -624,10 +688,10 @@ module ArrowFormat
     end
 
     def to_a
+      return [] if empty?
+
       children_values = @children.collect(&:to_a)
-      @types_buffer.each(type_buffer_type,
-                         type_element_size * @offset,
-                         @size).with_index.collect do |(_, type), i|
+      each_type.with_index.collect do |type, i|
         index = @type.resolve_type_index(type)
         children_values[index][i]
       end
@@ -663,15 +727,19 @@ module ArrowFormat
       yield(@indices_buffer)
     end
 
+    def indices
+      buffer_type = @type.index_type.buffer_type
+      offset = IO::Buffer.size_of(buffer_type) * @offset
+      apply_validity(@indices_buffer.values(buffer_type, offset, @size))
+    end
+
     def to_a
+      return [] if empty?
+
       values = []
       @dictionaries.each do |dictionary|
         values.concat(dictionary.to_a)
       end
-      buffer_type = @type.index_type.buffer_type
-      offset = IO::Buffer.size_of(buffer_type) * @offset
-      indices =
-        apply_validity(@indices_buffer.values(buffer_type, offset, @size))
       indices.collect do |index|
         if index.nil?
           nil
diff --git a/ruby/red-arrow-format/lib/arrow-format/bitmap.rb 
b/ruby/red-arrow-format/lib/arrow-format/bitmap.rb
index e4a0dc76d3..183c3b28f3 100644
--- a/ruby/red-arrow-format/lib/arrow-format/bitmap.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/bitmap.rb
@@ -36,7 +36,7 @@ module ArrowFormat
       current = -1
       n_bytes = (@offset + @n_values) / 8
       @buffer.each(:U8, 0, n_bytes) do |offset, value|
-        7.times do |i|
+        8.times do |i|
           current += 1
           next if current < @offset
           yield((value & (1 << (i % 8))) > 0)
diff --git a/ruby/red-arrow-format/lib/arrow-format/error.rb 
b/ruby/red-arrow-format/lib/arrow-format/error.rb
index d73c4082be..f6aebb3645 100644
--- a/ruby/red-arrow-format/lib/arrow-format/error.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/error.rb
@@ -25,7 +25,7 @@ module ArrowFormat
     attr_reader :buffer
     def initialize(buffer, message)
       @buffer = buffer
-      super("#{message}: #{@buffer}")
+      super("#{message}: #{@buffer.inspect}")
     end
   end
 
diff --git a/ruby/red-arrow-format/lib/arrow-format/field.rb 
b/ruby/red-arrow-format/lib/arrow-format/field.rb
index 022091f651..28a4a90b09 100644
--- a/ruby/red-arrow-format/lib/arrow-format/field.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/field.rb
@@ -18,17 +18,14 @@ module ArrowFormat
   class Field
     attr_reader :name
     attr_reader :type
-    attr_reader :dictionary_id
     attr_reader :metadata
     def initialize(name,
                    type,
                    nullable: true,
-                   dictionary_id: nil,
                    metadata: nil)
       @name = name
       @type = type
       @nullable = nullable
-      @dictionary_id = dictionary_id
       @metadata = metadata
     end
 
@@ -41,7 +38,7 @@ module ArrowFormat
       fb_field.name = @name
       fb_field.nullable = @nullable
       if @type.respond_to?(:build_fb_field)
-        @type.build_fb_field(fb_field, self)
+        @type.build_fb_field(fb_field)
       else
         fb_field.type = @type.to_flatbuffers
       end
diff --git a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb 
b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
index ed510ff09c..7c749e5fbf 100644
--- a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
@@ -64,7 +64,7 @@ module ArrowFormat
                                 "Not a record batch message: #{i}: " +
                                 fb_header.class.name)
       end
-      read_record_batch(fb_header, @schema, body)
+      read_record_batch(fb_message.version, fb_header, @schema, body)
     end
 
     def each
@@ -108,11 +108,14 @@ module ArrowFormat
     def read_block(block, type, i)
       offset = block.offset
 
-      # If we can report property error information, we can use
+      # If we can report error information correctly, we can use
       # MessagePullReader here.
       #
       # message_pull_reader = MessagePullReader.new do |message, body|
-      #   return read_record_batch(message.header, @schema, body)
+      #   return read_record_batch(message.version,
+      #                            message.header,
+      #                            @schema,
+      #                            body)
       # end
       # chunk = @buffer.slice(offset,
       #                       MessagePullReader::CONTINUATION_SIZE +
@@ -123,12 +126,18 @@ module ArrowFormat
 
       continuation_size = CONTINUATION_BUFFER.size
       continuation = @buffer.slice(offset, continuation_size)
-      unless continuation == CONTINUATION_BUFFER
+      if continuation == CONTINUATION_BUFFER
+        offset += continuation_size
+      elsif continuation.get_value(MessagePullReader::CONTINUATION_TYPE, 0) < 0
         raise FileReadError.new(@buffer,
                                 "Invalid continuation: #{type}: #{i}: " +
                                 continuation.inspect)
+      else
+        # For backward compatibility of data produced prior to version
+        # 0.15.0. It doesn't have continuation token. Ignore it and
+        # re-read it as metadata length.
+        continuation_size = 0
       end
-      offset += continuation_size
 
       metadata_length_type = MessagePullReader::METADATA_LENGTH_TYPE
       metadata_length_size = MessagePullReader::METADATA_LENGTH_SIZE
@@ -161,7 +170,7 @@ module ArrowFormat
       dictionary_fields = {}
       @schema.fields.each do |field|
         next unless field.type.is_a?(DictionaryType)
-        dictionary_fields[field.dictionary_id] = field
+        dictionary_fields[field.type.id] = field
       end
 
       dictionaries = {}
@@ -194,7 +203,10 @@ module ArrowFormat
 
         value_type = dictionary_fields[id].type.value_type
         schema = Schema.new([Field.new("dummy", value_type)])
-        record_batch = read_record_batch(fb_header.data, schema, body)
+        record_batch = read_record_batch(fb_message.version,
+                                         fb_header.data,
+                                         schema,
+                                         body)
         if fb_header.delta?
           dictionaries[id] << record_batch.columns[0]
         else
diff --git a/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb 
b/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb
new file mode 100644
index 0000000000..3660e8af34
--- /dev/null
+++ b/ruby/red-arrow-format/lib/arrow-format/integration/json-reader.rb
@@ -0,0 +1,408 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require "json"
+
+module ArrowFormat
+  module Integration
+    class JSONReader
+      attr_reader :schema
+      def initialize(input)
+        @json = JSON.load(input.read)
+        @dictionaries = {}
+        @schema = read_schema
+      end
+
+      def each
+        @json["batches"].each do |record_batch|
+          yield(read_record_batch(record_batch))
+        end
+      end
+
+      private
+      def read_metadata(metadata)
+        return nil if metadata.nil?
+
+        metadata.inject({}) do |result, metadatum|
+          result[metadatum["key"]] = metadatum["value"]
+          result
+        end
+      end
+
+      def read_type(type, children)
+        case type["name"]
+        when "null"
+          NullType.singleton
+        when "bool"
+          BooleanType.singleton
+        when "int"
+          is_signed = type["isSigned"]
+          case type["bitWidth"]
+          when 8
+            if is_signed
+              Int8Type.singleton
+            else
+              UInt8Type.singleton
+            end
+          when 16
+            if is_signed
+              Int16Type.singleton
+            else
+              UInt16Type.singleton
+            end
+          when 32
+            if is_signed
+              Int32Type.singleton
+            else
+              UInt32Type.singleton
+            end
+          when 64
+            if is_signed
+              Int64Type.singleton
+            else
+              UInt64Type.singleton
+            end
+          else
+            raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+          end
+        when "floatingpoint"
+          case type["precision"]
+          when "SINGLE"
+            Float32Type.singleton
+          when "DOUBLE"
+            Float64Type.singleton
+          else
+            raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+          end
+        when "date"
+          case type["unit"]
+          when "DAY"
+            Date32Type.singleton
+          when "MILLISECOND"
+            Date64Type.singleton
+          else
+            raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+          end
+        when "time"
+          unit = type["unit"].downcase.to_sym
+          case type["bitWidth"]
+          when 32
+            Time32Type.new(unit)
+          when 64
+            Time64Type.new(unit)
+          else
+            raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+          end
+        when "timestamp"
+          unit = type["unit"].downcase.to_sym
+          TimestampType.new(unit, type["timezone"])
+        when "interval"
+          case type["unit"]
+          when "YEAR_MONTH"
+            YearMonthIntervalType.singleton
+          when "DAY_TIME"
+            DayTimeIntervalType.singleton
+          when "MONTH_DAY_NANO"
+            MonthDayNanoIntervalType.singleton
+          else
+            raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+          end
+        when "duration"
+          DurationType.new(type["unit"].downcase.to_sym)
+        when "binary"
+          BinaryType.singleton
+        when "largebinary"
+          LargeBinaryType.singleton
+        when "utf8"
+          UTF8Type.singleton
+        when "largeutf8"
+          LargeUTF8Type.singleton
+        when "fixedsizebinary"
+          FixedSizeBinaryType.new(type["byteWidth"])
+        when "decimal"
+          precision = type["precision"]
+          scale = type["scale"]
+          case type["bitWidth"]
+          when 128
+            Decimal128Type.new(precision, scale)
+          when 256
+            Decimal256Type.new(precision, scale)
+          else
+            raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+          end
+        when "list"
+          ListType.new(read_field(children[0]))
+        when "largelist"
+          LargeListType.new(read_field(children[0]))
+        when "fixedsizelist"
+          FixedSizeListType.new(read_field(children[0]), type["listSize"])
+        when "struct"
+          StructType.new(children.collect {|child| read_field(child)})
+        when "map"
+          MapType.new(read_field(children[0]), type["keysSorted"])
+        when "union"
+          children = children.collect {|child| read_field(child)}
+          type_ids = type["typeIds"]
+          case type["mode"]
+          when "DENSE"
+            DenseUnionType.new(children, type_ids)
+          when "SPARSE"
+            SparseUnionType.new(children, type_ids)
+          else
+            raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+          end
+        else
+          raise "Unsupported type: #{type.inspect}: #{children.inspect}"
+        end
+      end
+
+      def read_field(field)
+        type = read_type(field["type"], field["children"])
+        dictionary = field["dictionary"]
+        if dictionary
+          index_type = read_type(dictionary["indexType"], [])
+          value_type = type
+          type = DictionaryType.new(dictionary["id"],
+                                    index_type,
+                                    value_type,
+                                    dictionary["isOrdered"])
+        end
+        metadata = read_metadata(field["metadata"])
+        Field.new(field["name"],
+                  type,
+                  nullable: field["nullable"],
+                  metadata: metadata)
+      end
+
+      def read_dictionary(id, type)
+        @json["dictionaries"].each do |dictionary|
+          next unless dictionary["id"] == id
+          return read_array(dictionary["data"]["columns"][0], type)
+        end
+      end
+
+      def read_schema
+        fields = []
+        @json["schema"]["fields"].each do |field|
+          fields << read_field(field)
+        end
+        metadata = read_metadata(@json["schema"]["metadata"])
+        Schema.new(fields, metadata: metadata)
+      end
+
+      def read_bitmap(bitmap)
+        buffer = +"".b
+        bitmap.each_slice(8) do |bits|
+          byte = 0
+          while bits.size < 8
+            bits << 0
+          end
+          bits.reverse_each do |bit|
+            byte = (byte << 1) + bit
+          end
+          buffer << [byte].pack("C")
+        end
+        IO::Buffer.for(buffer)
+      end
+
+      def read_types(types)
+        buffer_type = :S8
+        size = IO::Buffer.size_of(buffer_type)
+        buffer = IO::Buffer.new(size * types.size)
+        types.each_with_index do |type, i|
+          offset = size * i
+          buffer.set_value(buffer_type, offset, type)
+        end
+        buffer
+      end
+
+      def read_offsets(offsets, type)
+        return nil if offsets.nil?
+
+        case type
+        when LargeListType, LargeBinaryType, LargeUTF8Type
+          offsets = offsets.collect {|offset| Integer(offset, 10)}
+        end
+        size = IO::Buffer.size_of(type.offset_buffer_type)
+        buffer = IO::Buffer.new(size * offsets.size)
+        offsets.each_with_index do |offset, i|
+          value_offset = size * i
+          buffer.set_value(type.offset_buffer_type, value_offset, offset)
+        end
+        buffer
+      end
+
+      def read_hex_value(value)
+        values = value.scan(/.{2}/).collect do |hex|
+          Integer(hex, 16)
+        end
+        values.pack("C*")
+      end
+
+      def read_values(data, type)
+        case type
+        when BooleanType
+          read_bitmap(data.collect {|boolean| boolean ? 1 : 0})
+        when DayTimeIntervalType
+          buffer_types = [type.buffer_type] * 2
+          size = IO::Buffer.size_of(buffer_types)
+          buffer = IO::Buffer.new(size * data.size)
+          data.each_with_index do |value, i|
+            offset = size * i
+            components = value.fetch_values("days", "milliseconds")
+            buffer.set_values(buffer_types, offset, components)
+          end
+          buffer
+        when MonthDayNanoIntervalType
+          size = IO::Buffer.size_of(type.buffer_types)
+          buffer = IO::Buffer.new(size * data.size)
+          data.each_with_index do |value, i|
+            offset = size * i
+            components = value.fetch_values("months", "days", "nanoseconds")
+            buffer.set_values(type.buffer_types, offset, components)
+          end
+          buffer
+        when NumberType,
+             TemporalType
+          size = IO::Buffer.size_of(type.buffer_type)
+          buffer = IO::Buffer.new(size * data.size)
+          data.each_with_index do |value, i|
+            offset = size * i
+            # If the type is 64bit such as `Int64Type`, `value` is a
+            # string not integer to round-trip data through JSON.
+            value = Integer(value, 10) if value.is_a?(String)
+            buffer.set_value(type.buffer_type, offset, value)
+          end
+          buffer
+        when DecimalType
+          byte_width = type.byte_width
+          bit_width = byte_width * 8
+          buffer = IO::Buffer.new(byte_width * data.size)
+          data.each_with_index do |value, i|
+            offset = byte_width * i
+            components = []
+            value = BigDecimal(value)
+            value *= 10 ** value.scale if value.scale > 0
+            bits = "%0#{bit_width}b" % value.to_i
+            if value.negative?
+              # `bits` starts with "..1".
+              #
+              # If `value` is the minimum negative value, `bits` may
+              # be larger than `bit_width` because of the start `..`
+              # (2 characters).
+              bits = bits.delete_prefix("..").rjust(bit_width, "1")
+            end
+            bits.scan(/[01]{64}/).reverse_each do |chunk|
+              buffer.set_value(:u64, offset, Integer(chunk, 2))
+              offset += 8
+            end
+          end
+          buffer
+        when UTF8Type, LargeUTF8Type
+          IO::Buffer.for(data.join)
+        when VariableSizeBinaryType, FixedSizeBinaryType
+          IO::Buffer.for(data.collect {|value| read_hex_value(value)}.join)
+        else
+          raise "Unsupported values: #{data.inspect}: #{type.inspect}"
+        end
+      end
+
+      def read_array(column, type)
+        length = column["count"]
+        case type
+        when NullType
+          type.build_array(length)
+        when BooleanType,
+             NumberType,
+             TemporalType,
+             FixedSizeBinaryType
+          validity_buffer = read_bitmap(column["VALIDITY"])
+          values_buffer = read_values(column["DATA"], type)
+          type.build_array(length, validity_buffer, values_buffer)
+        when VariableSizeBinaryType
+          validity_buffer = read_bitmap(column["VALIDITY"])
+          offsets_buffer = read_offsets(column["OFFSET"], type)
+          values_buffer = read_values(column["DATA"], type)
+          type.build_array(length,
+                           validity_buffer,
+                           offsets_buffer,
+                           values_buffer)
+        when VariableSizeListType
+          validity_buffer = read_bitmap(column["VALIDITY"])
+          offsets_buffer = read_offsets(column["OFFSET"], type)
+          child = read_array(column["children"][0], type.child.type)
+          type.build_array(length,
+                           validity_buffer,
+                           offsets_buffer,
+                           child)
+        when FixedSizeListType
+          validity_buffer = read_bitmap(column["VALIDITY"])
+          child = read_array(column["children"][0], type.child.type)
+          type.build_array(length, validity_buffer, child)
+        when StructType
+          validity_buffer = read_bitmap(column["VALIDITY"])
+          children = column["children"]
+                       .zip(type.children)
+                       .collect do |child_column, child_field|
+            read_array(child_column, child_field.type)
+          end
+          type.build_array(length, validity_buffer, children)
+        when DenseUnionType
+          types_buffer = read_types(column["TYPE_ID"])
+          offsets_buffer = read_offsets(column["OFFSET"], type)
+          children = column["children"]
+                       .zip(type.children)
+                       .collect do |child_column, child_field|
+            read_array(child_column, child_field.type)
+          end
+          type.build_array(length,
+                           types_buffer,
+                           offsets_buffer,
+                           children)
+        when SparseUnionType
+          types_buffer = read_types(column["TYPE_ID"])
+          children = column["children"]
+                       .zip(type.children)
+                       .collect do |child_column, child_field|
+            read_array(child_column, child_field.type)
+          end
+          type.build_array(length, types_buffer, children)
+        when DictionaryType
+          validity_buffer = read_bitmap(column["VALIDITY"])
+          indices_buffer = read_values(column["DATA"], type.index_type)
+          dictionary = read_dictionary(type.id, type.value_type)
+          type.build_array(length,
+                           validity_buffer,
+                           indices_buffer,
+                           [dictionary])
+        else
+          raise "Unsupported array: #{column.inspect}: #{field.inspect}"
+        end
+      end
+
+      def read_record_batch(record_batch)
+        n_rows = record_batch["count"]
+        columns = record_batch["columns"]
+                    .zip(@schema.fields)
+                    .collect do |column, field|
+          read_array(column, field.type)
+        end
+        RecordBatch.new(@schema, n_rows, columns)
+      end
+    end
+  end
+end
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb 
b/ruby/red-arrow-format/lib/arrow-format/integration/options.rb
similarity index 52%
copy from ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
copy to ruby/red-arrow-format/lib/arrow-format/integration/options.rb
index f11972c67a..a12e8a10ca 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/integration/options.rb
@@ -15,35 +15,39 @@
 # specific language governing permissions and limitations
 # under the License.
 
-require_relative "streaming-pull-reader"
-
 module ArrowFormat
-  class StreamingReader
-    include Enumerable
-
-    attr_reader :schema
-    def initialize(input)
-      @input = input
-      @schema = nil
-    end
-
-    def each
-      return to_enum(__method__) unless block_given?
+  module Integration
+    class Options
+      class << self
+        def singleton
+          @singleton ||= new
+        end
+      end
 
-      reader = StreamingPullReader.new do |record_batch|
-        @schema ||= reader.schema
-        yield(record_batch)
+      attr_reader :command
+      attr_reader :arrow
+      attr_reader :arrows
+      attr_reader :json
+      def initialize
+        @command = ENV["COMMAND"]
+        @arrow = ENV["ARROW"]
+        @arrows = ENV["ARROWS"]
+        @json = ENV["JSON"]
+        @validate_date64 = ENV["QUIRK_NO_DATE64_VALIDATE"] != "true"
+        @validate_decimal = ENV["QUIRK_NO_DECIMAL_VALIDATE"] != "true"
+        @validate_time = ENV["QUIRK_NO_TIMES_VALIDATE"] != "true"
       end
 
-      buffer = "".b
-      loop do
-        next_size = reader.next_required_size
-        break if next_size.zero?
+      def validate_date64?
+        @validate_date64
+      end
 
-        next_chunk = @input.read(next_size, buffer)
-        break if next_chunk.nil?
+      def validate_decimal?
+        @validate_decimal
+      end
 
-        reader.consume(IO::Buffer.for(next_chunk))
+      def validate_time?
+        @validate_time
       end
     end
   end
diff --git a/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb 
b/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb
new file mode 100644
index 0000000000..b1eec2fc55
--- /dev/null
+++ b/ruby/red-arrow-format/lib/arrow-format/integration/validate.rb
@@ -0,0 +1,656 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require "json"
+require "test-unit"
+
+ENV["TEST_UNIT_MAX_DIFF_TARGET_STRING_SIZE"] ||= "1_000_000"
+
+class ArrowFormatIntegrationTest < Test::Unit::TestCase
+  module ToHashable
+    refine ArrowFormat::Type do
+      def to_h
+        {
+          "name" => normalized_name,
+        }
+      end
+
+      private
+      def normalized_name
+        name.downcase
+      end
+    end
+
+    refine ArrowFormat::BooleanType do
+      private
+      def normalized_name
+        "bool"
+      end
+    end
+
+    refine ArrowFormat::IntType do
+      def to_h
+        super.merge("bitWidth" => @bit_width,
+                    "isSigned" => @signed)
+      end
+
+      private
+      def normalized_name
+        "int"
+      end
+    end
+
+    refine ArrowFormat::FloatingPointType do
+      def to_h
+        super.merge("precision" => normalized_precision)
+      end
+
+      private
+      def normalized_name
+        "floatingpoint"
+      end
+
+      def normalized_precision
+        @precision.to_s.upcase
+      end
+    end
+
+    refine ArrowFormat::DateType do
+      def to_h
+        super.merge("unit" => normalized_unit)
+      end
+
+      private
+      def normalized_name
+        "date"
+      end
+
+      def normalized_unit
+        @unit.to_s.upcase
+      end
+    end
+
+    refine ArrowFormat::TimeType do
+      def to_h
+        super.merge("bitWidth" => @bit_width,
+                    "unit" => normalized_unit)
+      end
+
+      private
+      def normalized_name
+        "time"
+      end
+
+      def normalized_unit
+        @unit.to_s.upcase
+      end
+    end
+
+    refine ArrowFormat::TimestampType do
+      def to_h
+        hash = super
+        hash["unit"] = normalized_unit
+        hash["timezone"] = @time_zone if @time_zone
+        hash
+      end
+
+      private
+      def normalized_name
+        "timestamp"
+      end
+
+      def normalized_unit
+        @unit.to_s.upcase
+      end
+    end
+
+    refine ArrowFormat::IntervalType do
+      def to_h
+        super.merge("unit" => normalized_unit)
+      end
+
+      private
+      def normalized_name
+        "interval"
+      end
+
+      def normalized_unit
+        @unit.to_s.upcase
+      end
+    end
+
+    refine ArrowFormat::DurationType do
+      def to_h
+        super.merge("unit" => normalized_unit)
+      end
+
+      private
+      def normalized_unit
+        @unit.to_s.upcase
+      end
+    end
+
+    refine ArrowFormat::FixedSizeBinaryType do
+      def to_h
+        super.merge("byteWidth" => @byte_width)
+      end
+    end
+
+    refine ArrowFormat::FixedSizeListType do
+      def to_h
+        super.merge("listSize" => @size)
+      end
+    end
+
+    refine ArrowFormat::DecimalType do
+      def to_h
+        hash = super
+        hash.delete("byteWidth")
+        hash["bitWidth"] = @byte_width * 8
+        hash["precision"] = @precision
+        hash["scale"] = @scale
+        hash
+      end
+
+      private
+      def normalized_name
+        "decimal"
+      end
+    end
+
+    refine ArrowFormat::UnionType do
+      def to_h
+        super.merge("mode" => normalized_mode,
+                    "typeIds" => normalized_type_ids)
+      end
+
+      private
+      def normalized_name
+        "union"
+      end
+
+      def normalized_mode
+        @mode.to_s.upcase
+      end
+
+      def normalized_type_ids
+        @type_ids
+      end
+    end
+
+    refine ArrowFormat::MapType do
+      def to_h
+        super.merge("keysSorted" => @keys_sorted)
+      end
+    end
+
+    module MetadataNormalizable
+      private
+      def normalized_metadata
+        metadata_list = @metadata.collect do |key, value|
+          {"key" => key, "value" => value}
+        end
+        metadata_list.sort_by do |metadatum|
+          metadatum["key"]
+        end
+      end
+    end
+
+    refine ArrowFormat::Field do
+      import_methods MetadataNormalizable
+
+      def to_h
+        hash = {
+          "children" => normalized_children,
+          "name" => @name,
+          "nullable" => @nullable,
+        }
+        if @type.is_a?(ArrowFormat::DictionaryType)
+          hash["type"] = @type.value_type.to_h
+          hash["dictionary"] = {
+            "id" => @type.id,
+            "indexType" => @type.index_type.to_h,
+            "isOrdered" => @type.ordered?,
+          }
+        else
+          hash["type"] = @type.to_h
+        end
+        hash["metadata"] = normalized_metadata if @metadata
+        hash
+      end
+
+      private
+      def normalized_children
+        if @type.respond_to?(:children)
+          @type.children.collect(&:to_h)
+        elsif @type.respond_to?(:child)
+          [@type.child.to_h]
+        else
+          []
+        end
+      end
+    end
+
+    refine ArrowFormat::Schema do
+      import_methods MetadataNormalizable
+
+      def to_h
+        hash = {
+          "fields" => @fields.collect(&:to_h),
+        }
+        hash["metadata"] = normalized_metadata if @metadata
+        hash
+      end
+    end
+
+    refine ArrowFormat::Array do
+      def to_h
+        hash = {
+          "count" => size,
+        }
+        to_h_data(hash)
+        hash
+      end
+
+      private
+      def to_h_data(hash)
+        hash["DATA"] = normalized_data
+        hash["VALIDITY"] = normalized_validity
+      end
+
+      def normalized_data
+        to_a
+      end
+
+      def normalized_validity
+        if @validity_buffer
+          validity_bitmap.collect {|valid| valid ? 1 : 0}
+        else
+          [1] * @size
+        end
+      end
+
+      def stringify_data(data)
+        data.collect do |value|
+          if value.nil?
+            nil
+          else
+            value.to_s
+          end
+        end
+      end
+
+      def hexify(value)
+        value.each_byte.collect {|byte| "%02X" % byte}.join("")
+      end
+
+      def normalized_children
+        @children.zip(@type.children).collect do |child, field|
+          normalize_child(child, field)
+        end
+      end
+
+      def normalize_child(child, field)
+        child.to_h.merge("name" => field.name)
+      end
+    end
+
+    refine ArrowFormat::NullArray do
+      private
+      def to_h_data(hash)
+      end
+    end
+
+    refine ArrowFormat::Int64Array do
+      private
+      def normalized_data
+        stringify_data(super)
+      end
+    end
+
+    refine ArrowFormat::UInt64Array do
+      private
+      def normalized_data
+        stringify_data(super)
+      end
+    end
+
+    refine ArrowFormat::Float32Array do
+      private
+      def normalized_data
+        super.collect do |value|
+          if value.nil?
+            nil
+          else
+            value.round(3)
+          end
+        end
+      end
+    end
+
+    refine ArrowFormat::Date64Array do
+      private
+      def normalized_data
+        stringify_data(super)
+      end
+    end
+
+    refine ArrowFormat::Time64Array do
+      private
+      def normalized_data
+        stringify_data(super)
+      end
+    end
+
+    refine ArrowFormat::TimestampArray do
+      private
+      def normalized_data
+        stringify_data(super)
+      end
+    end
+
+    refine ArrowFormat::DayTimeIntervalArray do
+      private
+      def normalized_data
+        super.collect do |value|
+          if value.nil?
+            nil
+          else
+            day, time = value
+            {"days" => day, "milliseconds" => time}
+          end
+        end
+      end
+    end
+
+    refine ArrowFormat::MonthDayNanoIntervalArray do
+      private
+      def normalized_data
+        super.collect do |value|
+          if value.nil?
+            nil
+          else
+            month, day, nano = value
+            {"months" => month, "days" => day, "nanoseconds" => nano}
+          end
+        end
+      end
+    end
+
+    refine ArrowFormat::MonthDayNanoIntervalArray do
+      private
+      def normalized_data
+        super.collect do |value|
+          if value.nil?
+            nil
+          else
+            month, day, nano = value
+            {"months" => month, "days" => day, "nanoseconds" => nano}
+          end
+        end
+      end
+    end
+
+    refine ArrowFormat::DurationArray do
+      private
+      def normalized_data
+        stringify_data(super)
+      end
+    end
+
+    refine ArrowFormat::VariableSizeBinaryArray do
+      private
+      def to_h_data(hash)
+        super(hash)
+        hash["OFFSET"] = normalized_offsets
+      end
+
+      def normalized_data
+        super.collect do |value|
+          if value.nil?
+            nil
+          else
+            normalize_value(value)
+          end
+        end
+      end
+
+      def normalize_value(value)
+        hexify(value)
+      end
+
+      def normalized_offsets
+        offsets
+      end
+    end
+
+    refine ArrowFormat::LargeBinaryArray do
+      private
+      def normalized_offsets
+        offsets.collect(&:to_s)
+      end
+    end
+
+    refine ArrowFormat::VariableSizeUTF8Array do
+      private
+      def normalize_value(value)
+        value
+      end
+    end
+
+    refine ArrowFormat::LargeUTF8Array do
+      private
+      def normalized_offsets
+        offsets.collect(&:to_s)
+      end
+    end
+
+    refine ArrowFormat::FixedSizeBinaryArray do
+      private
+      def normalized_data
+        super.collect do |value|
+          if value.nil?
+            nil
+          else
+            normalize_value(value)
+          end
+        end
+      end
+
+      def normalize_value(value)
+        hexify(value)
+      end
+    end
+
+    refine ArrowFormat::DecimalArray do
+      private
+      def normalize_value(value)
+        (value * (10 ** (@type.scale))).to_s("f").delete_suffix(".0")
+      end
+    end
+
+    refine ArrowFormat::VariableSizeListArray do
+      private
+      def to_h_data(hash)
+        hash["OFFSET"] = normalized_offsets
+        hash["VALIDITY"] = normalized_validity
+        hash["children"] = [normalize_child(@child, @type.child)]
+      end
+
+      def normalized_offsets
+        offsets
+      end
+    end
+
+    refine ArrowFormat::FixedSizeListArray do
+      private
+      def to_h_data(hash)
+        hash["VALIDITY"] = normalized_validity
+        hash["children"] = [normalize_child(@child, @type.child)]
+      end
+    end
+
+    refine ArrowFormat::LargeListArray do
+      private
+      def normalized_offsets
+        offsets.collect(&:to_s)
+      end
+    end
+
+    refine ArrowFormat::StructArray do
+      private
+      def to_h_data(hash)
+        hash["VALIDITY"] = normalized_validity
+        hash["children"] = normalized_children
+      end
+    end
+
+    refine ArrowFormat::UnionArray do
+      private
+      def to_h_data(hash)
+        hash["TYPE_ID"] = each_type.to_a
+        hash["children"] = normalized_children
+      end
+    end
+
+    refine ArrowFormat::DenseUnionArray do
+      private
+      def to_h_data(hash)
+        super
+        hash["OFFSET"] = each_offset.to_a
+      end
+    end
+
+    refine ArrowFormat::DictionaryArray do
+      private
+      def normalized_data
+        indices
+      end
+    end
+
+    refine ArrowFormat::RecordBatch do
+      def to_h
+        {
+          "columns" => normalized_columns,
+          "count" => @n_rows,
+        }
+      end
+
+      private
+      def normalized_columns
+        @schema.fields.zip(@columns).collect do |field, column|
+          column.to_h.merge("name" => field.name)
+        end
+      end
+    end
+  end
+
+  using ToHashable
+
+  def setup
+    @options = ArrowFormat::Integration::Options.singleton
+  end
+
+  def normalize_field!(field)
+    metadata = field["metadata"]
+    if metadata
+      field["metadata"] = metadata.sort_by do |metadatum|
+        metadatum["key"]
+      end
+    end
+
+    case field["type"]["name"]
+    when "decimal"
+      field["type"]["bitWidth"] ||= 128 unless @options.validate_decimal?
+    when "map"
+      entries = field["children"][0]
+      entries["name"] = "entries"
+      entries["children"][0]["name"] = "key"
+      entries["children"][1]["name"] = "value"
+    end
+  end
+
+  def normalize_schema!(schema)
+    schema["fields"].each do |field|
+      normalize_field!(field)
+    end
+  end
+
+  def normalize_array!(array, field)
+    case field["type"]["name"]
+    when "map"
+      entries = array["children"][0]
+      entries["name"] = "entries"
+      entries["children"][0]["name"] = "key"
+      entries["children"][1]["name"] = "value"
+    when "union"
+      # V4 data has VALIDITY.
+      array.delete("VALIDITY")
+    end
+
+    data = array["DATA"]
+    validity = array["VALIDITY"]
+    if data and validity
+      array["DATA"] = data.zip(validity).collect do |value, valid_bit|
+        if (valid_bit == 1)
+          value
+        else
+          nil
+        end
+      end
+    end
+
+    child_arrays = array["children"]
+    if child_arrays
+      child_fields = field["children"]
+      child_arrays.zip(child_fields) do |child_array, child_field|
+        normalize_array!(child_array, child_field)
+      end
+    end
+  end
+
+  def normalize_record_batch!(record_batch, schema)
+    record_batch["columns"].zip(schema["fields"]) do |column, field|
+      normalize_array!(column, field)
+    end
+  end
+
+  def test_validate
+    expected = JSON.parse(File.read(@options.json))
+    expected_schema = expected["schema"]
+    normalize_schema!(expected_schema)
+    File.open(@options.arrow, "rb") do |input|
+      reader = ArrowFormat::FileReader.new(input)
+      expected_record_batches = []
+      actual_record_batches = []
+      reader.each.with_index do |record_batch, i|
+        expected_record_batch = expected["batches"][i]
+        normalize_record_batch!(expected_record_batch, expected_schema)
+        expected_record_batches << expected_record_batch
+        actual_record_batches << record_batch.to_h
+      end
+      assert_equal({
+                     schema: expected_schema,
+                     record_batches: expected_record_batches,
+                   },
+                   {
+                     schema: reader.schema.to_h,
+                     record_batches: actual_record_batches,
+                   })
+    end
+  end
+end
diff --git a/ruby/red-arrow-format/lib/arrow-format/readable.rb 
b/ruby/red-arrow-format/lib/arrow-format/readable.rb
index c5c1d5d2b3..783b494b68 100644
--- a/ruby/red-arrow-format/lib/arrow-format/readable.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/readable.rb
@@ -42,7 +42,10 @@ module ArrowFormat
                  metadata: read_custom_metadata(fb_schema.custom_metadata))
     end
 
-    def read_field(fb_field)
+    def read_field(fb_field,
+                   map_entries: false,
+                   map_key: false,
+                   map_value: false)
       fb_type = fb_field.type
       case fb_type
       when FB::Null
@@ -57,6 +60,8 @@ module ArrowFormat
           type = Float32Type.singleton
         when FB::Precision::DOUBLE
           type = Float64Type.singleton
+        else
+          raise ReadError.new("Unsupported type: #{fb_type.inspect}")
         end
       when FB::Date
         case fb_type.unit
@@ -64,6 +69,8 @@ module ArrowFormat
           type = Date32Type.singleton
         when FB::DateUnit::MILLISECOND
           type = Date64Type.singleton
+        else
+          raise ReadError.new("Unsupported type: #{fb_type.inspect}")
         end
       when FB::Time
         case fb_type.bit_width
@@ -73,6 +80,8 @@ module ArrowFormat
             type = Time32Type.new(:second)
           when FB::TimeUnit::MILLISECOND
             type = Time32Type.new(:millisecond)
+          else
+            raise ReadError.new("Unsupported type: #{fb_type.inspect}")
           end
         when 64
           case fb_type.unit
@@ -80,6 +89,8 @@ module ArrowFormat
             type = Time64Type.new(:microsecond)
           when FB::TimeUnit::NANOSECOND
             type = Time64Type.new(:nanosecond)
+          else
+            raise ReadError.new("Unsupported type: #{fb_type.inspect}")
           end
         end
       when FB::Timestamp
@@ -93,6 +104,8 @@ module ArrowFormat
           type = DayTimeIntervalType.singleton
         when FB::IntervalUnit::MONTH_DAY_NANO
           type = MonthDayNanoIntervalType.singleton
+        else
+          raise ReadError.new("Unsupported type: #{fb_type.inspect}")
         end
       when FB::Duration
         unit = fb_type.unit.name.downcase.to_sym
@@ -105,7 +118,15 @@ module ArrowFormat
         type = FixedSizeListType.new(read_field(fb_field.children[0]),
                                      fb_type.list_size)
       when FB::Struct
-        children = fb_field.children.collect {|child| read_field(child)}
+        if map_entries
+          fb_children = fb_field.children
+          children = [
+            read_field(fb_children[0], map_key: true),
+            read_field(fb_children[1], map_value: true),
+          ]
+        else
+          children = fb_field.children.collect {|child| read_field(child)}
+        end
         type = StructType.new(children)
       when FB::Union
         children = fb_field.children.collect {|child| read_field(child)}
@@ -115,9 +136,12 @@ module ArrowFormat
           type = DenseUnionType.new(children, type_ids)
         when FB::UnionMode::SPARSE
           type = SparseUnionType.new(children, type_ids)
+        else
+          raise ReadError.new("Unsupported type: #{fb_type.inspect}")
         end
       when FB::Map
-        type = MapType.new(read_field(fb_field.children[0]))
+        type = MapType.new(read_field(fb_field.children[0], map_entries: true),
+                           fb_type.keys_sorted?)
       when FB::Binary
         type = BinaryType.singleton
       when FB::LargeBinary
@@ -134,21 +158,42 @@ module ArrowFormat
           type = Decimal128Type.new(fb_type.precision, fb_type.scale)
         when 256
           type = Decimal256Type.new(fb_type.precision, fb_type.scale)
+        else
+          raise ReadError.new("Unsupported type: #{fb_type.inspect}")
         end
+      else
+        raise ReadError.new("Unsupported type: #{fb_type.inspect}")
       end
 
       dictionary = fb_field.dictionary
       if dictionary
         dictionary_id = dictionary.id
         index_type = read_type_int(dictionary.index_type)
-        type = DictionaryType.new(index_type, type, dictionary.ordered?)
+        value_type = type
+        type = DictionaryType.new(dictionary_id,
+                                  index_type,
+                                  value_type,
+                                  dictionary.ordered?)
+      end
+
+      # Map type uses static "entries"/"key"/"value" as field names
+      # instead of field names in FlatBuffers. It's based on the
+      # specification:
+      #
+      #   The names of the child fields may be respectively "entries",
+      #   "key", and "value", but this is not enforced.
+      if map_entries
+        name = "entries"
+      elsif map_key
+        name = "key"
+      elsif map_value
+        name = "value"
       else
-        dictionary_id = nil
+        name = fb_field.name
       end
-      Field.new(fb_field.name,
+      Field.new(name,
                 type,
                 nullable: fb_field.nullable?,
-                dictionary_id: dictionary_id,
                 metadata: read_custom_metadata(fb_field.custom_metadata))
     end
 
@@ -181,17 +226,17 @@ module ArrowFormat
       end
     end
 
-    def read_record_batch(fb_record_batch, schema, body)
+    def read_record_batch(version, fb_record_batch, schema, body)
       n_rows = fb_record_batch.length
       nodes = fb_record_batch.nodes
       buffers = fb_record_batch.buffers
       columns = schema.fields.collect do |field|
-        read_column(field, nodes, buffers, body)
+        read_column(version, field, nodes, buffers, body)
       end
       RecordBatch.new(schema, n_rows, columns)
     end
 
-    def read_column(field, nodes, buffers, body)
+    def read_column(version, field, nodes, buffers, body)
       node = nodes.shift
       length = node.length
 
@@ -209,51 +254,63 @@ module ArrowFormat
            NumberType,
            TemporalType
         values_buffer = buffers.shift
-        values = body.slice(values_buffer.offset, values_buffer.length)
+        values = body&.slice(values_buffer.offset, values_buffer.length)
         field.type.build_array(length, validity, values)
       when VariableSizeBinaryType
         offsets_buffer = buffers.shift
         values_buffer = buffers.shift
-        offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
-        values = body.slice(values_buffer.offset, values_buffer.length)
+        offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length)
+        values = body&.slice(values_buffer.offset, values_buffer.length)
         field.type.build_array(length, validity, offsets, values)
       when FixedSizeBinaryType
         values_buffer = buffers.shift
-        values = body.slice(values_buffer.offset, values_buffer.length)
+        values = body&.slice(values_buffer.offset, values_buffer.length)
         field.type.build_array(length, validity, values)
       when VariableSizeListType
         offsets_buffer = buffers.shift
-        offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
-        child = read_column(field.type.child, nodes, buffers, body)
+        offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length)
+        child = read_column(version, field.type.child, nodes, buffers, body)
         field.type.build_array(length, validity, offsets, child)
       when FixedSizeListType
-        child = read_column(field.type.child, nodes, buffers, body)
+        child = read_column(version, field.type.child, nodes, buffers, body)
         field.type.build_array(length, validity, child)
       when StructType
         children = field.type.children.collect do |child|
-          read_column(child, nodes, buffers, body)
+          read_column(version, child, nodes, buffers, body)
         end
         field.type.build_array(length, validity, children)
       when DenseUnionType
-        # dense union type doesn't have validity.
-        types = validity
+        if version == FB::MetadataVersion::V4
+          # Dense union type has validity with V4.
+          types_buffer = buffers.shift
+          types = body&.slice(types_buffer.offset, types_buffer.length)
+        else
+          # Dense union type doesn't have validity.
+          types = validity
+        end
         offsets_buffer = buffers.shift
-        offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
+        offsets = body&.slice(offsets_buffer.offset, offsets_buffer.length)
         children = field.type.children.collect do |child|
-          read_column(child, nodes, buffers, body)
+          read_column(version, child, nodes, buffers, body)
         end
         field.type.build_array(length, types, offsets, children)
       when SparseUnionType
-        # sparse union type doesn't have validity.
-        types = validity
+        if version == FB::MetadataVersion::V4
+          # Sparse union type has validity with V4.
+          types_buffer = buffers.shift
+          types = body&.slice(types_buffer.offset, types_buffer.length)
+        else
+          # Sparse union type doesn't have validity.
+          types = validity
+        end
         children = field.type.children.collect do |child|
-          read_column(child, nodes, buffers, body)
+          read_column(version, child, nodes, buffers, body)
         end
         field.type.build_array(length, types, children)
       when DictionaryType
         indices_buffer = buffers.shift
-        indices = body.slice(indices_buffer.offset, indices_buffer.length)
-        dictionaries = find_dictionaries(field.dictionary_id)
+        indices = body&.slice(indices_buffer.offset, indices_buffer.length)
+        dictionaries = find_dictionaries(field.type.id)
         field.type.build_array(length, validity, indices, dictionaries)
       end
     end
diff --git a/ruby/red-arrow-format/lib/arrow-format/record-batch.rb 
b/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
index a641c87da7..938f02d762 100644
--- a/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
@@ -22,6 +22,8 @@ module ArrowFormat
 
     attr_reader :schema
     attr_reader :n_rows
+    alias_method :size, :n_rows
+    alias_method :length, :n_rows
     attr_reader :columns
     def initialize(schema, n_rows, columns)
       @schema = schema
@@ -29,6 +31,10 @@ module ArrowFormat
       @columns = columns
     end
 
+    def empty?
+      @n_rows.zero?
+    end
+
     def to_h
       hash = {}
       @schema.fields.zip(@columns) do |field, column|
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb 
b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
index 5657ca4a1b..13e7ad7243 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
@@ -100,11 +100,23 @@ module ArrowFormat
     private
     def consume_initial(target)
       continuation = target.get_value(CONTINUATION_TYPE, 0)
-      unless continuation == CONTINUATION_INT32
+      if continuation == CONTINUATION_INT32
+        @state = :metadata_length
+      elsif continuation < 0
         raise ReadError.new("Invalid continuation token: " +
                             continuation.inspect)
+      else
+        # For backward compatibility of data produced prior to version
+        # 0.15.0. It doesn't have continuation token. Ignore it and
+        # re-read it as metadata length.
+        metadata_length = continuation
+        if metadata_length == 0
+          @state = :eos
+        else
+          @metadata_length = metadata_length
+          @state = :metadata
+        end
       end
-      @state = :metadata_length
     end
 
     def consume_metadata_length(target)
@@ -204,7 +216,7 @@ module ArrowFormat
       @dictionary_fields = {}
       @schema.fields.each do |field|
         next unless field.type.is_a?(DictionaryType)
-        @dictionary_fields[field.dictionary_id] = field
+        @dictionary_fields[field.type.id] = field
       end
       if @dictionaries.size < @dictionary_fields.size
         @state = :initial_dictionaries
@@ -223,7 +235,10 @@ module ArrowFormat
       field = @dictionary_fields[header.id]
       value_type = field.type.value_type
       schema = Schema.new([Field.new("dummy", value_type)])
-      record_batch = read_record_batch(header.data, schema, body)
+      record_batch = read_record_batch(message.version,
+                                       header.data,
+                                       schema,
+                                       body)
       if header.delta?
         @dictionaries[header.id] << record_batch.columns[0]
       else
@@ -237,7 +252,7 @@ module ArrowFormat
 
     def process_record_batch_message(message, body)
       header = message.header
-      @on_read.call(read_record_batch(header, @schema, body))
+      @on_read.call(read_record_batch(message.version, header, @schema, body))
     end
   end
 end
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb 
b/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
index f11972c67a..f81cfe8913 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
@@ -21,29 +21,49 @@ module ArrowFormat
   class StreamingReader
     include Enumerable
 
-    attr_reader :schema
     def initialize(input)
       @input = input
-      @schema = nil
+      @on_read = nil
+      @pull_reader = StreamingPullReader.new do |record_batch|
+        @on_read.call(record_batch) if @on_read
+      end
+      @buffer = "".b
+      ensure_schema
+    end
+
+    def schema
+      @pull_reader.schema
     end
 
-    def each
+    def each(&block)
       return to_enum(__method__) unless block_given?
 
-      reader = StreamingPullReader.new do |record_batch|
-        @schema ||= reader.schema
-        yield(record_batch)
+      @on_read = block
+      begin
+        loop do
+          break unless consume
+        end
+      ensure
+        @on_read = nil
       end
+    end
 
-      buffer = "".b
-      loop do
-        next_size = reader.next_required_size
-        break if next_size.zero?
+    private
+    def consume
+      next_size = @pull_reader.next_required_size
+      return false if next_size.zero?
+
+      next_chunk = @input.read(next_size, @buffer)
+      return false if next_chunk.nil?
 
-        next_chunk = @input.read(next_size, buffer)
-        break if next_chunk.nil?
+      @pull_reader.consume(IO::Buffer.for(next_chunk))
+      true
+    end
 
-        reader.consume(IO::Buffer.for(next_chunk))
+    def ensure_schema
+      loop do
+        break unless consume
+        break if @pull_reader.schema
       end
     end
   end
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb 
b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
index 0621dcbb89..18eb2dda3a 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
@@ -40,9 +40,9 @@ module ArrowFormat
 
     def write_record_batch(record_batch)
       record_batch.schema.fields.each_with_index do |field, i|
-        next if field.dictionary_id.nil?
+        next unless field.type.is_a?(DictionaryType)
         dictionary_array = record_batch.columns[i]
-        write_dictionary(field.dictionary_id, dictionary_array)
+        write_dictionary(field.type.id, dictionary_array)
       end
 
       write_record_batch_based_message(record_batch,
diff --git a/ruby/red-arrow-format/lib/arrow-format/type.rb 
b/ruby/red-arrow-format/lib/arrow-format/type.rb
index fb153450b0..17674af30c 100644
--- a/ruby/red-arrow-format/lib/arrow-format/type.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/type.rb
@@ -912,7 +912,7 @@ module ArrowFormat
   end
 
   class MapType < VariableSizeListType
-    def initialize(child)
+    def initialize(child, keys_sorted)
       if child.nullable?
         raise TypeError.new("Map entry field must not be nullable: " +
                             child.inspect)
@@ -930,12 +930,17 @@ module ArrowFormat
                             type.children[0].inspect)
       end
       super(child)
+      @keys_sorted = keys_sorted
     end
 
     def name
       "Map"
     end
 
+    def keys_sorted?
+      @keys_sorted
+    end
+
     def offset_buffer_type
       :s32 # TODO: big endian support
     end
@@ -993,6 +998,10 @@ module ArrowFormat
       "DenseUnion"
     end
 
+    def offset_buffer_type
+      :s32
+    end
+
     def build_array(size, types_buffer, offsets_buffer, children)
       DenseUnionArray.new(self, size, types_buffer, offsets_buffer, children)
     end
@@ -1013,10 +1022,12 @@ module ArrowFormat
   end
 
   class DictionaryType < Type
+    attr_reader :id
     attr_reader :index_type
     attr_reader :value_type
-    def initialize(index_type, value_type, ordered)
+    def initialize(id, index_type, value_type, ordered)
       super()
+      @id = id
       @index_type = index_type
       @value_type = value_type
       @ordered = ordered
@@ -1038,22 +1049,21 @@ module ArrowFormat
                           dictionaries)
     end
 
-    def build_fb_field(fb_field, field)
+    def build_fb_field(fb_field)
       fb_dictionary_encoding = FB::DictionaryEncoding::Data.new
-      fb_dictionary_encoding.id = field.dictionary_id
+      fb_dictionary_encoding.id = @id
       fb_int = FB::Int::Data.new
       fb_int.bit_width = @index_type.bit_width
       fb_int.signed = @index_type.signed?
       fb_dictionary_encoding.index_type = fb_int
       fb_dictionary_encoding.ordered = @ordered
-      fb_dictionary_encoding.dictionary_kind =
-        FB::DictionaryKind::DENSE_ARRAY
+      fb_dictionary_encoding.dictionary_kind = FB::DictionaryKind::DENSE_ARRAY
       fb_field.type = @value_type.to_flatbuffers
       fb_field.dictionary = fb_dictionary_encoding
     end
 
     def to_s
-      "#{super}<index=#{@index_type}, value=#{@value_type}, " +
+      "#{super}<id=#{@id}, index=#{@index_type}, value=#{@value_type}, " +
         "ordered=#{@ordered}>"
     end
   end
diff --git a/ruby/red-arrow-format/red-arrow-format.gemspec 
b/ruby/red-arrow-format/red-arrow-format.gemspec
index 52340b9748..9148a21f50 100644
--- a/ruby/red-arrow-format/red-arrow-format.gemspec
+++ b/ruby/red-arrow-format/red-arrow-format.gemspec
@@ -44,9 +44,10 @@ Gem::Specification.new do |spec|
   spec.files = ["README.md", "Rakefile", "Gemfile", "#{spec.name}.gemspec"]
   spec.files += ["LICENSE.txt", "NOTICE.txt"]
   spec.files += Dir.glob("lib/**/*.rb")
+  spec.files -= Dir.glob("lib/arrow-format/integration/**/*.rb")
   spec.files += Dir.glob("doc/text/*")
 
-  spec.add_runtime_dependency("red-flatbuffers", ">=0.0.6")
+  spec.add_runtime_dependency("red-flatbuffers", ">=0.0.8")
 
   github_url = "https://github.com/apache/arrow";
   spec.metadata = {
diff --git a/ruby/red-arrow-format/test/test-reader.rb 
b/ruby/red-arrow-format/test/test-reader.rb
index 06360f62ac..d59a93ce18 100644
--- a/ruby/red-arrow-format/test/test-reader.rb
+++ b/ruby/red-arrow-format/test/test-reader.rb
@@ -670,7 +670,7 @@ module ReaderTests
     array = string_array.dictionary_encode
     type, values = roundtrip(array)
     assert_equal([
-                   "Dictionary<index=Int32, value=UTF8, ordered=false>",
+                   "Dictionary<id=0, index=Int32, value=UTF8, ordered=false>",
                    ["a", "b", "c", nil, "a"],
                  ],
                  [type.to_s, values])
diff --git a/ruby/red-arrow-format/test/test-writer.rb 
b/ruby/red-arrow-format/test/test-writer.rb
index f36a1a252e..72776f01ab 100644
--- a/ruby/red-arrow-format/test/test-writer.rb
+++ b/ruby/red-arrow-format/test/test-writer.rb
@@ -86,7 +86,8 @@ module WriterHelper
     when Arrow::FixedSizeBinaryDataType
       ArrowFormat::FixedSizeBinaryType.new(red_arrow_type.byte_width)
     when Arrow::MapDataType
-      ArrowFormat::MapType.new(convert_field(red_arrow_type.field))
+      ArrowFormat::MapType.new(convert_field(red_arrow_type.field),
+                               red_arrow_type.keys_sorted?)
     when Arrow::ListDataType
       ArrowFormat::ListType.new(convert_field(red_arrow_type.field))
     when Arrow::LargeListDataType
@@ -110,10 +111,14 @@ module WriterHelper
       end
       ArrowFormat::SparseUnionType.new(fields, red_arrow_type.type_codes)
     when Arrow::DictionaryDataType
+      @dictionary_id ||= 0
+      dictionary_id = @dictionary_id
+      @dictionary_id += 1
       index_type = convert_type(red_arrow_type.index_data_type)
-      type = convert_type(red_arrow_type.value_data_type)
-      ArrowFormat::DictionaryType.new(index_type,
-                                      type,
+      value_type = convert_type(red_arrow_type.value_data_type)
+      ArrowFormat::DictionaryType.new(dictionary_id,
+                                      index_type,
+                                      value_type,
                                       red_arrow_type.ordered?)
     else
       raise "Unsupported type: #{red_arrow_type.inspect}"
@@ -122,17 +127,9 @@ module WriterHelper
 
   def convert_field(red_arrow_field)
     type = convert_type(red_arrow_field.data_type)
-    if type.is_a?(ArrowFormat::DictionaryType)
-      @dictionary_id ||= 0
-      dictionary_id = @dictionary_id
-      @dictionary_id += 1
-    else
-      dictionary_id = nil
-    end
     ArrowFormat::Field.new(red_arrow_field.name,
                            type,
                            nullable: red_arrow_field.nullable?,
-                           dictionary_id: dictionary_id,
                            metadata: red_arrow_field.metadata)
   end
 
@@ -930,13 +927,13 @@ end
 module WriterDictionaryDeltaTests
   def build_schema(value_type)
     index_type = ArrowFormat::Int32Type.singleton
+    dictionary_id = 1
     ordered = false
-    type = ArrowFormat::DictionaryType.new(index_type,
+    type = ArrowFormat::DictionaryType.new(dictionary_id,
+                                           index_type,
                                            value_type,
                                            ordered)
-    field = ArrowFormat::Field.new("value",
-                                   type,
-                                   dictionary_id: 1)
+    field = ArrowFormat::Field.new("value", type)
     ArrowFormat::Schema.new([field])
   end
 
diff --git a/ruby/red-arrow/lib/arrow/table-formatter.rb 
b/ruby/red-arrow/lib/arrow/table-formatter.rb
index b93faf09cb..b4e257413c 100644
--- a/ruby/red-arrow/lib/arrow/table-formatter.rb
+++ b/ruby/red-arrow/lib/arrow/table-formatter.rb
@@ -80,7 +80,11 @@ module Arrow
         when nil
           "%*s" % [width, FORMATTED_NULL]
         else
-          "%-*s" % [width, value.to_s]
+          value = value.to_s
+          if value.encoding == Encoding::ASCII_8BIT
+            value = value.each_byte.collect {|byte| "%X" % byte}.join
+          end
+          "%-*s" % [width, value]
         end
       end
 


Reply via email to