Repository: avro Updated Branches: refs/heads/master 0550d2cce -> 4b3677c32
AVRO-1969: Add schema compatibility checker for Ruby This closes #170 Signed-off-by: Sean Busbey <[email protected]> Signed-off-by: sacharya <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/avro/repo Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/4b3677c3 Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/4b3677c3 Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/4b3677c3 Branch: refs/heads/master Commit: 4b3677c32b879e0e7f717eb95f9135ac654da760 Parents: 0550d2c Author: Tim Perkins <[email protected]> Authored: Thu Dec 15 09:35:21 2016 -0500 Committer: sacharya <[email protected]> Committed: Tue Apr 4 17:20:10 2017 -0500 ---------------------------------------------------------------------- lang/ruby/Manifest | 2 + lang/ruby/lib/avro.rb | 1 + lang/ruby/lib/avro/io.rb | 49 +-- lang/ruby/lib/avro/schema.rb | 28 +- lang/ruby/lib/avro/schema_compatibility.rb | 168 ++++++++ lang/ruby/test/test_io.rb | 34 ++ lang/ruby/test/test_schema.rb | 34 ++ lang/ruby/test/test_schema_compatibility.rb | 463 +++++++++++++++++++++++ 8 files changed, 728 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/Manifest ---------------------------------------------------------------------- diff --git a/lang/ruby/Manifest b/lang/ruby/Manifest index 3edd7cf..87bfd98 100644 --- a/lang/ruby/Manifest +++ b/lang/ruby/Manifest @@ -11,6 +11,7 @@ lib/avro/io.rb lib/avro/ipc.rb lib/avro/protocol.rb lib/avro/schema.rb +lib/avro/schema_compatibility.rb lib/avro/schema_normalization.rb lib/avro/schema_validator.rb test/case_finder.rb @@ -25,6 +26,7 @@ test/test_help.rb test/test_io.rb test/test_protocol.rb test/test_schema.rb +test/test_schema_compatibility.rb test/test_schema_normalization.rb test/test_schema_validator.rb test/test_socket_transport.rb http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/lib/avro.rb ---------------------------------------------------------------------- diff --git a/lang/ruby/lib/avro.rb b/lang/ruby/lib/avro.rb index 1293f0f..81afbda 100644 --- a/lang/ruby/lib/avro.rb +++ b/lang/ruby/lib/avro.rb @@ -41,3 +41,4 @@ require 'avro/protocol' require 'avro/ipc' require 'avro/schema_normalization' require 'avro/schema_validator' +require 'avro/schema_compatibility' http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/lib/avro/io.rb ---------------------------------------------------------------------- diff --git a/lang/ruby/lib/avro/io.rb b/lang/ruby/lib/avro/io.rb index 22beea2..b04a19a 100644 --- a/lang/ruby/lib/avro/io.rb +++ b/lang/ruby/lib/avro/io.rb @@ -221,46 +221,7 @@ module Avro class DatumReader def self.match_schemas(writers_schema, readers_schema) - w_type = writers_schema.type_sym - r_type = readers_schema.type_sym - - # This conditional is begging for some OO love. - if w_type == :union || r_type == :union - return true - end - - if w_type == r_type - return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type) - - case r_type - when :record - return writers_schema.fullname == readers_schema.fullname - when :error - return writers_schema.fullname == readers_schema.fullname - when :request - return true - when :fixed - return writers_schema.fullname == readers_schema.fullname && - writers_schema.size == readers_schema.size - when :enum - return writers_schema.fullname == readers_schema.fullname - when :map - return writers_schema.values.type == readers_schema.values.type - when :array - return writers_schema.items.type == readers_schema.items.type - end - end - - # Handle schema promotion - if w_type == :int && [:long, :float, :double].include?(r_type) - return true - elsif w_type == :long && [:float, :double].include?(r_type) - return true - elsif w_type == :float && r_type == :double - return true - end - - return false + Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) end attr_accessor :writers_schema, :readers_schema @@ -393,11 +354,11 @@ module Avro writers_fields_hash = writers_schema.fields_hash readers_fields_hash.each do |field_name, field| unless writers_fields_hash.has_key? field_name - if !field.default.nil? + if field.default? field_val = read_default_value(field.type, field.default) read_record[field.name] = field_val else - # FIXME(jmhodges) another 'unset' here + raise AvroError, "Missing data for #{field.type} with no default" end end end @@ -407,10 +368,6 @@ module Avro end def read_default_value(field_schema, default_value) - if default_value == :no_default - raise AvroError, "Missing data for #{field_schema} with no default" - end - # Basically a JSON Decoder? case field_schema.type_sym when :null http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/lib/avro/schema.rb ---------------------------------------------------------------------- diff --git a/lang/ruby/lib/avro/schema.rb b/lang/ruby/lib/avro/schema.rb index 5038311..024d562 100644 --- a/lang/ruby/lib/avro/schema.rb +++ b/lang/ruby/lib/avro/schema.rb @@ -122,6 +122,18 @@ module Avro Digest::SHA256.hexdigest(parsing_form).to_i(16) end + def read?(writers_schema) + SchemaCompatibility.can_read?(writers_schema, self) + end + + def be_read?(other_schema) + other_schema.read?(self) + end + + def mutual_read?(other_schema) + SchemaCompatibility.mutual_read?(other_schema, self) + end + def ==(other, seen=nil) other.is_a?(Schema) && type_sym == other.type_sym end @@ -210,7 +222,11 @@ module Avro else super(schema_type, name, namespace, names, doc) end - @fields = RecordSchema.make_field_objects(fields, names, self.namespace) + @fields = if fields + RecordSchema.make_field_objects(fields, names, self.namespace) + else + {} + end end def fields_hash @@ -261,8 +277,7 @@ module Avro def initialize(schemas, names=nil, default_namespace=nil) super(:union) - schema_objects = [] - schemas.each_with_index do |schema, i| + @schemas = schemas.each_with_object([]) do |schema, schema_objects| new_schema = subparse(schema, names, default_namespace) ns_type = new_schema.type_sym @@ -275,7 +290,6 @@ module Avro else schema_objects << new_schema end - @schemas = schema_objects end end @@ -348,9 +362,13 @@ module Avro @doc = doc end + def default? + @default != :no_default + end + def to_avro(names=Set.new) {'name' => name, 'type' => type.to_avro(names)}.tap do |avro| - avro['default'] = default unless default == :no_default + avro['default'] = default if default? avro['order'] = order if order avro['doc'] = doc if doc end http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/lib/avro/schema_compatibility.rb ---------------------------------------------------------------------- diff --git a/lang/ruby/lib/avro/schema_compatibility.rb b/lang/ruby/lib/avro/schema_compatibility.rb new file mode 100644 index 0000000..1842b3e --- /dev/null +++ b/lang/ruby/lib/avro/schema_compatibility.rb @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +module Avro + module SchemaCompatibility + # Perform a full, recursive check that a datum written using the writers_schema + # can be read using the readers_schema. + def self.can_read?(writers_schema, readers_schema) + Checker.new.can_read?(writers_schema, readers_schema) + end + + # Perform a full, recursive check that a datum written using either the + # writers_schema or the readers_schema can be read using the other schema. + def self.mutual_read?(writers_schema, readers_schema) + Checker.new.mutual_read?(writers_schema, readers_schema) + end + + # Perform a basic check that a datum written with the writers_schema could + # be read using the readers_schema. This check only includes matching the types, + # including schema promotion, and matching the full name for named types. + # Aliases for named types are not supported here, and the ruby implementation + # of Avro in general does not include support for aliases. + def self.match_schemas(writers_schema, readers_schema) + w_type = writers_schema.type_sym + r_type = readers_schema.type_sym + + # This conditional is begging for some OO love. + if w_type == :union || r_type == :union + return true + end + + if w_type == r_type + return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type) + + case r_type + when :record + return writers_schema.fullname == readers_schema.fullname + when :error + return writers_schema.fullname == readers_schema.fullname + when :request + return true + when :fixed + return writers_schema.fullname == readers_schema.fullname && + writers_schema.size == readers_schema.size + when :enum + return writers_schema.fullname == readers_schema.fullname + when :map + return match_schemas(writers_schema.values, readers_schema.values) + when :array + return match_schemas(writers_schema.items, readers_schema.items) + end + end + + # Handle schema promotion + if w_type == :int && [:long, :float, :double].include?(r_type) + return true + elsif w_type == :long && [:float, :double].include?(r_type) + return true + elsif w_type == :float && r_type == :double + return true + elsif w_type == :string && r_type == :bytes + return true + elsif w_type == :bytes && r_type == :string + return true + end + + return false + end + + class Checker + SIMPLE_CHECKS = Schema::PRIMITIVE_TYPES_SYM.dup.add(:fixed).freeze + + attr_reader :recursion_set + private :recursion_set + + def initialize + @recursion_set = Set.new + end + + def can_read?(writers_schema, readers_schema) + full_match_schemas(writers_schema, readers_schema) + end + + def mutual_read?(writers_schema, readers_schema) + can_read?(writers_schema, readers_schema) && can_read?(readers_schema, writers_schema) + end + + private + + def full_match_schemas(writers_schema, readers_schema) + return true if recursion_in_progress?(writers_schema, readers_schema) + + return false unless Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) + + if writers_schema.type_sym != :union && SIMPLE_CHECKS.include?(readers_schema.type_sym) + return true + end + + case readers_schema.type_sym + when :record + match_record_schemas(writers_schema, readers_schema) + when :map + full_match_schemas(writers_schema.values, readers_schema.values) + when :array + full_match_schemas(writers_schema.items, readers_schema.items) + when :union + match_union_schemas(writers_schema, readers_schema) + when :enum + # reader's symbols must contain all writer's symbols + (writers_schema.symbols - readers_schema.symbols).empty? + else + if writers_schema.type_sym == :union && writers_schema.schemas.size == 1 + full_match_schemas(writers_schema.schemas.first, readers_schema) + else + false + end + end + end + + def match_union_schemas(writers_schema, readers_schema) + raise 'readers_schema must be a union' unless readers_schema.type_sym == :union + + case writers_schema.type_sym + when :union + writers_schema.schemas.all? { |writer_type| full_match_schemas(writer_type, readers_schema) } + else + readers_schema.schemas.any? { |reader_type| full_match_schemas(writers_schema, reader_type) } + end + end + + def match_record_schemas(writers_schema, readers_schema) + writer_fields_hash = writers_schema.fields_hash + readers_schema.fields.each do |field| + if writer_fields_hash.key?(field.name) + return false unless full_match_schemas(writer_fields_hash[field.name].type, field.type) + else + return false unless field.default? + end + end + + return true + end + + def recursion_in_progress?(writers_schema, readers_schema) + key = [writers_schema.object_id, readers_schema.object_id] + + if recursion_set.include?(key) + true + else + recursion_set.add(key) + false + end + end + end + end +end http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/test/test_io.rb ---------------------------------------------------------------------- diff --git a/lang/ruby/test/test_io.rb b/lang/ruby/test/test_io.rb index 09d725d..fc0088b 100644 --- a/lang/ruby/test/test_io.rb +++ b/lang/ruby/test/test_io.rb @@ -341,6 +341,40 @@ EOS end end + def test_interchangeable_schemas + interchangeable_schemas = ['"string"', '"bytes"'] + incorrect = 0 + interchangeable_schemas.each_with_index do |ws, i| + writers_schema = Avro::Schema.parse(ws) + datum_to_write = 'foo' + readers_schema = Avro::Schema.parse(interchangeable_schemas[i == 0 ? 1 : 0]) + writer, * = write_datum(datum_to_write, writers_schema) + datum_read = read_datum(writer, writers_schema, readers_schema) + if datum_read != datum_to_write + incorrect += 1 + end + end + assert_equal(incorrect, 0) + end + + def test_array_schema_promotion + writers_schema = Avro::Schema.parse('{"type":"array", "items":"int"}') + readers_schema = Avro::Schema.parse('{"type":"array", "items":"long"}') + datum_to_write = [1, 2] + writer, * = write_datum(datum_to_write, writers_schema) + datum_read = read_datum(writer, writers_schema, readers_schema) + assert_equal(datum_read, datum_to_write) + end + + def test_map_schema_promotion + writers_schema = Avro::Schema.parse('{"type":"map", "values":"int"}') + readers_schema = Avro::Schema.parse('{"type":"map", "values":"long"}') + datum_to_write = { 'foo' => 1, 'bar' => 2 } + writer, * = write_datum(datum_to_write, writers_schema) + datum_read = read_datum(writer, writers_schema, readers_schema) + assert_equal(datum_read, datum_to_write) + end + def test_snappy_backward_compat # a snappy-compressed block payload without the checksum # this has no back-references, just one literal so the last 9 http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/test/test_schema.rb ---------------------------------------------------------------------- diff --git a/lang/ruby/test/test_schema.rb b/lang/ruby/test/test_schema.rb index 417d511..48fe0a5 100644 --- a/lang/ruby/test/test_schema.rb +++ b/lang/ruby/test/test_schema.rb @@ -258,4 +258,38 @@ class TestSchema < Test::Unit::TestCase } assert_equal enum_schema_hash, enum_schema_json.to_avro end + +def test_empty_record + schema = Avro::Schema.parse('{"type":"record", "name":"Empty"}') + assert_empty(schema.fields) + end + + def test_empty_union + schema = Avro::Schema.parse('[]') + assert_equal(schema.to_s, '[]') + end + + def test_read + schema = Avro::Schema.parse('"string"') + writer_schema = Avro::Schema.parse('"int"') + assert_false(schema.read?(writer_schema)) + assert_true(schema.read?(schema)) + end + + def test_be_read + schema = Avro::Schema.parse('"string"') + writer_schema = Avro::Schema.parse('"int"') + assert_false(schema.be_read?(writer_schema)) + assert_true(schema.be_read?(schema)) + end + + def test_mutual_read + schema = Avro::Schema.parse('"string"') + writer_schema = Avro::Schema.parse('"int"') + default1 = Avro::Schema.parse('{"type":"record", "name":"Default", "fields":[{"name":"i", "type":"int", "default": 1}]}') + default2 = Avro::Schema.parse('{"type":"record", "name":"Default", "fields":[{"name:":"s", "type":"string", "default": ""}]}') + assert_false(schema.mutual_read?(writer_schema)) + assert_true(schema.mutual_read?(schema)) + assert_true(default1.mutual_read?(default2)) + end end http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/test/test_schema_compatibility.rb ---------------------------------------------------------------------- diff --git a/lang/ruby/test/test_schema_compatibility.rb b/lang/ruby/test/test_schema_compatibility.rb new file mode 100644 index 0000000..138c895 --- /dev/null +++ b/lang/ruby/test/test_schema_compatibility.rb @@ -0,0 +1,463 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'test_help' + +class TestSchemaCompatibility < Test::Unit::TestCase + + def test_primitive_schema_compatibility + Avro::Schema::PRIMITIVE_TYPES.each do |schema_type| + assert_true(can_read?(send("#{schema_type}_schema"), send("#{schema_type}_schema"))) + end + end + + def test_compatible_reader_writer_pairs + [ + long_schema, int_schema, + float_schema, int_schema, + float_schema, long_schema, + double_schema, long_schema, + double_schema, int_schema, + double_schema, float_schema, + + int_array_schema, int_array_schema, + long_array_schema, int_array_schema, + int_map_schema, int_map_schema, + long_map_schema, int_map_schema, + + enum1_ab_schema, enum1_ab_schema, + enum1_abc_schema, enum1_ab_schema, + + string_schema, bytes_schema, + bytes_schema, string_schema, + + empty_union_schema, empty_union_schema, + int_union_schema, int_union_schema, + int_string_union_schema, string_int_union_schema, + int_union_schema, empty_union_schema, + long_union_schema, int_union_schema, + + int_union_schema, int_schema, + int_schema, int_union_schema, + + empty_record1_schema, empty_record1_schema, + empty_record1_schema, a_int_record1_schema, + + a_int_record1_schema, a_int_record1_schema, + a_dint_record1_schema, a_int_record1_schema, + a_dint_record1_schema, a_dint_record1_schema, + a_int_record1_schema, a_dint_record1_schema, + + a_long_record1_schema, a_int_record1_schema, + + a_int_record1_schema, a_int_b_int_record1_schema, + a_dint_record1_schema, a_int_b_int_record1_schema, + + a_int_b_dint_record1_schema, a_int_record1_schema, + a_dint_b_dint_record1_schema, empty_record1_schema, + a_dint_b_dint_record1_schema, a_int_record1_schema, + a_int_b_int_record1_schema, a_dint_b_dint_record1_schema, + + int_list_record_schema, int_list_record_schema, + long_list_record_schema, long_list_record_schema, + long_list_record_schema, int_list_record_schema, + + null_schema, null_schema + ].each_slice(2) do |(reader, writer)| + assert_true(can_read?(writer, reader), "expecting #{reader} to read #{writer}") + end + end + + def test_broken + assert_false(can_read?(int_string_union_schema, int_union_schema)) + end + + def test_incompatible_reader_writer_pairs + [ + null_schema, int_schema, + null_schema, long_schema, + + boolean_schema, int_schema, + + int_schema, null_schema, + int_schema, boolean_schema, + int_schema, long_schema, + int_schema, float_schema, + int_schema, double_schema, + + long_schema, float_schema, + long_schema, double_schema, + + float_schema, double_schema, + + string_schema, boolean_schema, + string_schema, int_schema, + + bytes_schema, null_schema, + bytes_schema, int_schema, + + int_array_schema, long_array_schema, + int_map_schema, int_array_schema, + int_array_schema, int_map_schema, + int_map_schema, long_map_schema, + + enum1_ab_schema, enum1_abc_schema, + enum1_bc_schema, enum1_abc_schema, + + enum1_ab_schema, enum2_ab_schema, + int_schema, enum2_ab_schema, + enum2_ab_schema, int_schema, + + int_union_schema, int_string_union_schema, + string_union_schema, int_string_union_schema, + + empty_record2_schema, empty_record1_schema, + a_int_record1_schema, empty_record1_schema, + a_int_b_dint_record1_schema, empty_record1_schema, + + int_list_record_schema, long_list_record_schema, + + null_schema, int_schema + ].each_slice(2) do |(reader, writer)| + assert_false(can_read?(writer, reader), "expecting #{reader} not to read #{writer}") + end + end + + def writer_schema + Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Record", "fields":[ + {"name":"oldfield1", "type":"int"}, + {"name":"oldfield2", "type":"string"} + ]} + SCHEMA + end + + def test_missing_field + reader_schema = Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Record", "fields":[ + {"name":"oldfield1", "type":"int"} + ]} + SCHEMA + assert_true(can_read?(writer_schema, reader_schema)) + assert_false(can_read?(reader_schema, writer_schema)) + end + + def test_missing_second_field + reader_schema = Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Record", "fields":[ + {"name":"oldfield2", "type":"string"} + ]} + SCHEMA + assert_true(can_read?(writer_schema, reader_schema)) + assert_false(can_read?(reader_schema, writer_schema)) + end + + def test_all_fields + reader_schema = Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Record", "fields":[ + {"name":"oldfield1", "type":"int"}, + {"name":"oldfield2", "type":"string"} + ]} + SCHEMA + assert_true(can_read?(writer_schema, reader_schema)) + assert_true(can_read?(reader_schema, writer_schema)) + end + + def test_new_field_with_default + reader_schema = Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Record", "fields":[ + {"name":"oldfield1", "type":"int"}, + {"name":"newfield1", "type":"int", "default":42} + ]} + SCHEMA + assert_true(can_read?(writer_schema, reader_schema)) + assert_false(can_read?(reader_schema, writer_schema)) + end + + def test_new_field + reader_schema = Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Record", "fields":[ + {"name":"oldfield1", "type":"int"}, + {"name":"newfield1", "type":"int"} + ]} + SCHEMA + assert_false(can_read?(writer_schema, reader_schema)) + assert_false(can_read?(reader_schema, writer_schema)) + end + + def test_array_writer_schema + valid_reader = string_array_schema + invalid_reader = string_map_schema + + assert_true(can_read?(string_array_schema, valid_reader)) + assert_false(can_read?(string_array_schema, invalid_reader)) + end + + def test_primitive_writer_schema + valid_reader = string_schema + assert_true(can_read?(string_schema, valid_reader)) + assert_false(can_read?(int_schema, string_schema)) + end + + def test_union_reader_writer_subset_incompatiblity + # reader union schema must contain all writer union branches + union_writer = union_schema(int_schema, string_schema) + union_reader = union_schema(string_schema) + + assert_false(can_read?(union_writer, union_reader)) + assert_true(can_read?(union_reader, union_writer)) + end + + def test_incompatible_record_field + string_schema = Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [ + {"name":"field1", "type":"string"} + ]} + SCHEMA + int_schema = Avro::Schema.parse <<-SCHEMA2 + {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [ + {"name":"field1", "type":"int"} + ]} + SCHEMA2 + assert_false(can_read?(string_schema, int_schema)) + end + + def test_enum_symbols + enum_schema1 = Avro::Schema.parse <<-SCHEMA + {"type":"enum", "name":"MyEnum", "symbols":["A","B"]} + SCHEMA + enum_schema2 = Avro::Schema.parse <<-SCHEMA + {"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]} + SCHEMA + assert_false(can_read?(enum_schema2, enum_schema1)) + assert_true(can_read?(enum_schema1, enum_schema2)) + end + + # Tests from lang/java/avro/src/test/java/org/apache/avro/io/parsing/TestResolvingGrammarGenerator2.java + + def point_2d_schema + Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Point2D", "fields":[ + {"name":"x", "type":"double"}, + {"name":"y", "type":"double"} + ]} + SCHEMA + end + + def point_2d_fullname_schema + Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Point", "namespace":"written", "fields":[ + {"name":"x", "type":"double"}, + {"name":"y", "type":"double"} + ]} + SCHEMA + end + + def point_3d_no_default_schema + Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Point", "fields":[ + {"name":"x", "type":"double"}, + {"name":"y", "type":"double"}, + {"name":"z", "type":"double"} + ]} + SCHEMA + end + + def point_3d_schema + Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Point3D", "fields":[ + {"name":"x", "type":"double"}, + {"name":"y", "type":"double"}, + {"name":"z", "type":"double", "default": 0.0} + ]} + SCHEMA + end + + def point_3d_match_name_schema + Avro::Schema.parse <<-SCHEMA + {"type":"record", "name":"Point", "fields":[ + {"name":"x", "type":"double"}, + {"name":"y", "type":"double"}, + {"name":"z", "type":"double", "default": 0.0} + ]} + SCHEMA + end + + def test_union_resolution_no_structure_match + # short name match, but no structure match + read_schema = union_schema(null_schema, point_3d_no_default_schema) + assert_false(can_read?(point_2d_fullname_schema, read_schema)) + end + + def test_union_resolution_first_structure_match_2d + # multiple structure matches with no name matches + read_schema = union_schema(null_schema, point_3d_no_default_schema, point_2d_schema, point_3d_schema) + assert_false(can_read?(point_2d_fullname_schema, read_schema)) + end + + def test_union_resolution_first_structure_match_3d + # multiple structure matches with no name matches + read_schema = union_schema(null_schema, point_3d_no_default_schema, point_3d_schema, point_2d_schema) + assert_false(can_read?(point_2d_fullname_schema, read_schema)) + end + + def test_union_resolution_named_structure_match + # multiple structure matches with a short name match + read_schema = union_schema(null_schema, point_2d_schema, point_3d_match_name_schema, point_3d_schema) + assert_false(can_read?(point_2d_fullname_schema, read_schema)) + end + + def test_union_resolution_full_name_match + # there is a full name match that should be chosen + read_schema = union_schema(null_schema, point_2d_schema, point_3d_match_name_schema, point_3d_schema, point_2d_fullname_schema) + assert_true(can_read?(point_2d_fullname_schema, read_schema)) + end + + def can_read?(writer, reader) + Avro::SchemaCompatibility.can_read?(writer, reader) + end + + def union_schema(*schemas) + schemas ||= [] + Avro::Schema.parse("[#{schemas.map(&:to_s).join(',')}]") + end + + Avro::Schema::PRIMITIVE_TYPES.each do |schema_type| + define_method("#{schema_type}_schema") do + Avro::Schema.parse("\"#{schema_type}\"") + end + end + + def int_array_schema + Avro::Schema.parse('{"type":"array", "items":"int"}') + end + + def long_array_schema + Avro::Schema.parse('{"type":"array", "items":"long"}') + end + + def string_array_schema + Avro::Schema.parse('{"type":"array", "items":"string"}') + end + + def int_map_schema + Avro::Schema.parse('{"type":"map", "values":"int"}') + end + + def long_map_schema + Avro::Schema.parse('{"type":"map", "values":"long"}') + end + + def string_map_schema + Avro::Schema.parse('{"type":"map", "values":"string"}') + end + + def enum1_ab_schema + Avro::Schema.parse('{"type":"enum", "name":"Enum1", "symbols":["A","B"]}') + end + + def enum1_abc_schema + Avro::Schema.parse('{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}') + end + + def enum1_bc_schema + Avro::Schema.parse('{"type":"enum", "name":"Enum1", "symbols":["B","C"]}') + end + + def enum2_ab_schema + Avro::Schema.parse('{"type":"enum", "name":"Enum2", "symbols":["A","B"]}') + end + + def empty_record1_schema + Avro::Schema.parse('{"type":"record", "name":"Record1"}') + end + + def empty_record2_schema + Avro::Schema.parse('{"type":"record", "name":"Record2"}') + end + + def a_int_record1_schema + Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}') + end + + def a_long_record1_schema + Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}') + end + + def a_int_b_int_record1_schema + Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}') + end + + def a_dint_record1_schema + Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}') + end + + def a_int_b_dint_record1_schema + Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}') + end + + def a_dint_b_dint_record1_schema + Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}') + end + + def int_list_record_schema + Avro::Schema.parse <<-SCHEMA + { + "type":"record", "name":"List", "fields": [ + {"name": "head", "type": "int"}, + {"name": "tail", "type": "List"} + ]} + SCHEMA + end + + def long_list_record_schema + Avro::Schema.parse <<-SCHEMA + { + "type":"record", "name":"List", "fields": [ + {"name": "head", "type": "long"}, + {"name": "tail", "type": "List"} + ]} + SCHEMA + end + + def empty_union_schema + union_schema + end + + def null_union_schema + union_schema(null_schema) + end + + def int_union_schema + union_schema(int_schema) + end + + def long_union_schema + union_schema(long_schema) + end + + def string_union_schema + union_schema(string_schema) + end + + def int_string_union_schema + union_schema(int_schema, string_schema) + end + + def string_int_union_schema + union_schema(string_schema, int_schema) + end +end
