afilipchik commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r419848101



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import 
org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new 
Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       let me find it. We had to patch kafka -> avro deser in hudi 0.5 as it 
was breaking in some situations. 
   In general based on spec at https://avro.apache.org/docs/current/spec.html:
   ```Data Serialization and Deserialization
   Binary encoded Avro data does not include type information or field names. 
The benefit is that the serialized data is small, but as a result a schema must 
always be used in order to read Avro data correctly. The best way to ensure 
that the schema is structurally identical to the one used to write the data is 
to use the exact same schema.
   
   Therefore, files or systems that store Avro data should always include the 
writer's schema for that data. Avro-based remote procedure call (RPC) systems 
must also guarantee that remote recipients of data have a copy of the schema 
used to write that data. In general, it is advisable that any reader of Avro 
data should use a schema that is the same (as defined more fully in Parsing 
Canonical Form for Schemas) as the schema that was used to write the data in 
order to deserialize it correctly. Deserializing data into a newer schema is 
accomplished by specifying an additional schema, the results of which are 
described in Schema Resolution.```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to