Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4943#discussion_r148924463
  
    --- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
    @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
                return this.reader.read(reuse, this.decoder);
        }
     
    +   // 
------------------------------------------------------------------------
    +   //  Copying
    +   // 
------------------------------------------------------------------------
    +
        @Override
    -   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
    +   public T copy(T from) {
                checkAvroInitialized();
    +           return avroData.deepCopy(schema, from);
    +   }
     
    -           if (this.deepCopyInstance == null) {
    -                   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
    -           }
    -
    -           this.decoder.setIn(source);
    -           this.encoder.setOut(target);
    +   @Override
    +   public T copy(T from, T reuse) {
    +           return copy(from);
    +   }
     
    -           T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
    -           this.writer.write(tmp, this.encoder);
    +   @Override
    +   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
    +           T value = deserialize(source);
    +           serialize(value, target);
        }
     
    -   private void checkAvroInitialized() {
    -           if (this.reader == null) {
    -                   this.reader = new ReflectDatumReader<T>(type);
    -                   this.writer = new ReflectDatumWriter<T>(type);
    -                   this.encoder = new DataOutputEncoder();
    -                   this.decoder = new DataInputDecoder();
    +   // 
------------------------------------------------------------------------
    +   //  Compatibility and Upgrades
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   public TypeSerializerConfigSnapshot snapshotConfiguration() {
    +           if (configSnapshot == null) {
    +                   checkAvroInitialized();
    +                   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
                }
    +           return configSnapshot;
        }
     
    -   private void checkKryoInitialized() {
    -           if (this.kryo == null) {
    -                   this.kryo = new Kryo();
    -
    -                   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
    -                   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
    -                   kryo.setInstantiatorStrategy(instantiatorStrategy);
    +   @Override
    +   @SuppressWarnings("deprecation")
    +   public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
    +           if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
    +                   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
    +                   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
    +                   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
     
    -                   kryo.setAsmEnabled(true);
    +                   final SchemaPairCompatibility compatibility =
    +                                   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
     
    -                   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
    +                   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
    +                                   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
    +           }
    +           else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
    +                   // old snapshot case, just compare the type
    +                   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
    +                   // only for object-to-object copies.
    +                   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
    +                   return type.equals(old.getTypeClass()) ?
    +                                   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
    +           }
    +           else {
    +                   return CompatibilityResult.requiresMigration();
                }
        }
     
    -   // 
--------------------------------------------------------------------------------------------
    +   // 
------------------------------------------------------------------------
    +   //  Utilities
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   public TypeSerializer<T> duplicate() {
    +           return new AvroSerializer<>(type);
    +   }
     
        @Override
        public int hashCode() {
    -           return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
    +           return 42 + type.hashCode();
        }
     
        @Override
        public boolean equals(Object obj) {
    -           if (obj instanceof AvroSerializer) {
    -                   @SuppressWarnings("unchecked")
    -                   AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
    -
    -                   return avroSerializer.canEqual(this) &&
    -                           type == avroSerializer.type &&
    -                           typeToInstantiate == 
avroSerializer.typeToInstantiate;
    -           } else {
    +           if (obj == this) {
    +                   return true;
    +           }
    +           else if (obj != null && obj.getClass() == AvroSerializer.class) 
{
    +                   final AvroSerializer that = (AvroSerializer) obj;
    +                   return this.type == that.type;
    +           }
    +           else {
                        return false;
                }
        }
     
        @Override
        public boolean canEqual(Object obj) {
    -           return obj instanceof AvroSerializer;
    +           return obj.getClass() == this.getClass();
        }
     
    -   // 
--------------------------------------------------------------------------------------------
    -   // Serializer configuration snapshotting & compatibility
    -   // 
--------------------------------------------------------------------------------------------
    -
        @Override
    -   public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
    -           return new AvroSerializerConfigSnapshot<>(type, 
typeToInstantiate, kryoRegistrations);
    +   public String toString() {
    +           return getClass().getName() + " (" + getType().getName() + ')';
        }
     
    -   @SuppressWarnings("unchecked")
    -   @Override
    -   public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
    -           if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
    -                   final AvroSerializerConfigSnapshot<T> config = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
    +   // 
------------------------------------------------------------------------
    +   //  Initialization
    +   // 
------------------------------------------------------------------------
    +
    +   private void checkAvroInitialized() {
    +           if (writer == null) {
    +                   initializeAvro();
    +           }
    +   }
    +
    +   private void initializeAvro() {
    +           final ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
    +
    +           if (SpecificRecord.class.isAssignableFrom(type)) {
    +                   this.avroData = new SpecificData(cl);
    +                   this.schema = this.avroData.getSchema(type);
    +                   this.reader = new SpecificDatumReader<>(schema, schema, 
avroData);
    +                   this.writer = new SpecificDatumWriter<>(schema, 
avroData);
    +           }
    +           else {
    +                   final ReflectData reflectData = new ReflectData(cl);
    +                   this.avroData = reflectData;
    +                   this.schema = this.avroData.getSchema(type);
    +                   this.reader = new ReflectDatumReader<>(schema, schema, 
reflectData);
    +                   this.writer = new ReflectDatumWriter<>(schema, 
reflectData);
    +           }
    +
    +           this.encoder = new DataOutputEncoder();
    +           this.decoder = new DataInputDecoder();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Serializer Snapshots
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * A config snapshot for the Avro Serializer that stores the Avro 
Schema to check compatibility.
    +    */
    +   public static final class AvroSchemaSerializerConfigSnapshot extends 
TypeSerializerConfigSnapshot {
    +
    +           private String schemaString;
    +
    +           /**
    +            * Default constructor for instantiation via reflection.
    +            */
    +           @SuppressWarnings("unused")
    +           public AvroSchemaSerializerConfigSnapshot() {}
    +
    +           public AvroSchemaSerializerConfigSnapshot(String schemaString) {
    +                   this.schemaString = checkNotNull(schemaString);
    +           }
    +
    +           public String getSchemaString() {
    +                   return schemaString;
    +           }
    +
    +           // --- Serialization ---
    +
    +           @Override
    +           public void read(DataInputView in) throws IOException {
    +                   super.read(in);
    +                   this.schemaString = in.readUTF();
    +           }
    +
    +           @Override
    +           public void write(DataOutputView out) throws IOException {
    +                   super.write(out);
    +                   out.writeUTF(schemaString);
    +           }
     
    -                   if (type.equals(config.getTypeClass()) && 
typeToInstantiate.equals(config.getTypeToInstantiate())) {
    -                           // resolve Kryo registrations; currently, since 
the Kryo registrations in Avro
    -                           // are fixed, there shouldn't be a problem with 
the resolution here.
    +           // --- Version ---
     
    -                           LinkedHashMap<String, KryoRegistration> 
oldRegistrations = config.getKryoRegistrations();
    -                           oldRegistrations.putAll(kryoRegistrations);
    +           @Override
    +           public int getVersion() {
    +                   return 1;
    +           }
     
    -                           for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
    -                                   if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
    -                                           return 
CompatibilityResult.requiresMigration();
    -                                   }
    -                           }
    +           // --- Utils ---
     
    -                           this.kryoRegistrations = oldRegistrations;
    -                           return CompatibilityResult.compatible();
    +           @Override
    +           public boolean equals(Object obj) {
    +                   if (obj == this) {
    +                           return true;
    +                   }
    +                   else if (obj != null && obj.getClass() == 
AvroSchemaSerializerConfigSnapshot.class) {
    +                           final AvroSchemaSerializerConfigSnapshot that = 
(AvroSchemaSerializerConfigSnapshot) obj;
    +                           return 
this.schemaString.equals(that.schemaString);
    --- End diff --
    
    Is the schema string guaranteed to be stable or can it happen that two 
different Avro versions generate schema strings that Avro thinks are compatible 
but are slightly different strings?


---

Reply via email to