Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4943#discussion_r148977758
  
    --- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
    @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) 
throws IOException {
                return this.reader.read(reuse, this.decoder);
        }
     
    +   // 
------------------------------------------------------------------------
    +   //  Copying
    +   // 
------------------------------------------------------------------------
    +
        @Override
    -   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
    +   public T copy(T from) {
                checkAvroInitialized();
    +           return avroData.deepCopy(schema, from);
    +   }
     
    -           if (this.deepCopyInstance == null) {
    -                   this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
    -           }
    -
    -           this.decoder.setIn(source);
    -           this.encoder.setOut(target);
    +   @Override
    +   public T copy(T from, T reuse) {
    +           return copy(from);
    +   }
     
    -           T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
    -           this.writer.write(tmp, this.encoder);
    +   @Override
    +   public void copy(DataInputView source, DataOutputView target) throws 
IOException {
    +           T value = deserialize(source);
    +           serialize(value, target);
        }
     
    -   private void checkAvroInitialized() {
    -           if (this.reader == null) {
    -                   this.reader = new ReflectDatumReader<T>(type);
    -                   this.writer = new ReflectDatumWriter<T>(type);
    -                   this.encoder = new DataOutputEncoder();
    -                   this.decoder = new DataInputDecoder();
    +   // 
------------------------------------------------------------------------
    +   //  Compatibility and Upgrades
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   public TypeSerializerConfigSnapshot snapshotConfiguration() {
    +           if (configSnapshot == null) {
    +                   checkAvroInitialized();
    +                   configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
                }
    +           return configSnapshot;
        }
     
    -   private void checkKryoInitialized() {
    -           if (this.kryo == null) {
    -                   this.kryo = new Kryo();
    -
    -                   Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
    -                   
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
    -                   kryo.setInstantiatorStrategy(instantiatorStrategy);
    +   @Override
    +   @SuppressWarnings("deprecation")
    +   public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
    +           if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
    +                   // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
    +                   final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
    +                   final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
     
    -                   kryo.setAsmEnabled(true);
    +                   final SchemaPairCompatibility compatibility =
    +                                   
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
     
    -                   KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
    +                   return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
    +                                   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
    +           }
    +           else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
    +                   // old snapshot case, just compare the type
    +                   // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
    +                   // only for object-to-object copies.
    +                   final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
    +                   return type.equals(old.getTypeClass()) ?
    +                                   CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
    +           }
    +           else {
    +                   return CompatibilityResult.requiresMigration();
                }
        }
     
    -   // 
--------------------------------------------------------------------------------------------
    +   // 
------------------------------------------------------------------------
    +   //  Utilities
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   public TypeSerializer<T> duplicate() {
    +           return new AvroSerializer<>(type);
    +   }
     
        @Override
        public int hashCode() {
    -           return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
    +           return 42 + type.hashCode();
        }
     
        @Override
        public boolean equals(Object obj) {
    -           if (obj instanceof AvroSerializer) {
    -                   @SuppressWarnings("unchecked")
    -                   AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
    -
    -                   return avroSerializer.canEqual(this) &&
    -                           type == avroSerializer.type &&
    -                           typeToInstantiate == 
avroSerializer.typeToInstantiate;
    -           } else {
    +           if (obj == this) {
    +                   return true;
    +           }
    +           else if (obj != null && obj.getClass() == AvroSerializer.class) 
{
    +                   final AvroSerializer that = (AvroSerializer) obj;
    +                   return this.type == that.type;
    +           }
    +           else {
                        return false;
                }
        }
     
        @Override
        public boolean canEqual(Object obj) {
    -           return obj instanceof AvroSerializer;
    +           return obj.getClass() == this.getClass();
        }
     
    -   // 
--------------------------------------------------------------------------------------------
    -   // Serializer configuration snapshotting & compatibility
    -   // 
--------------------------------------------------------------------------------------------
    -
        @Override
    -   public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
    -           return new AvroSerializerConfigSnapshot<>(type, 
typeToInstantiate, kryoRegistrations);
    +   public String toString() {
    +           return getClass().getName() + " (" + getType().getName() + ')';
        }
     
    -   @SuppressWarnings("unchecked")
    -   @Override
    -   public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
    -           if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
    -                   final AvroSerializerConfigSnapshot<T> config = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
    +   // 
------------------------------------------------------------------------
    +   //  Initialization
    +   // 
------------------------------------------------------------------------
    +
    +   private void checkAvroInitialized() {
    +           if (writer == null) {
    +                   initializeAvro();
    +           }
    +   }
    +
    +   private void initializeAvro() {
    +           final ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
    --- End diff --
    
    This is the pattern used by the Kryo serializer for a while rather than 
`type.getClass().getClassLoader()`, because `type` could in theory be a 
collection class (application class loader) containing the actual type from 
another class loader.
    
    Might not be possible to happen for Avro, through, not totally sure...


---

Reply via email to