[ 
https://issues.apache.org/jira/browse/HUDI-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuan Huy Pham updated HUDI-2374:
--------------------------------
    Fix Version/s: 0.10.0
      Description: 
Hi,

I am not sure if the AvroDFSSource is intended to ignore the source schema from 
designated schema provider class, but the current logic always uses the Avro 
writer schema as reader schema.

 Logic as of release-0.9.0, class {{org.apache.hudi.utilities.sources.Source}}
{code:java}
public final InputBatch<T> fetchNext(Option<String> lastCkptStr, long 
sourceLimit) {
  InputBatch<T> batch = fetchNewData(lastCkptStr, sourceLimit);
  // If overriddenSchemaProvider is passed in CLI, use it
  return overriddenSchemaProvider == null ? batch
      : new InputBatch<>(batch.getBatch(), batch.getCheckpointForNextBatch(), 
overriddenSchemaProvider);
}
{code}
 

Class: {{org.apache.hudi.utilities.sources.AvroDFSSource}}
{code:java}
public class AvroDFSSource extends AvroSource {

  private final DFSPathSelector pathSelector;

  public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
      SchemaProvider schemaProvider) throws IOException {
    super(props, sparkContext, sparkSession, schemaProvider);
    this.pathSelector = DFSPathSelector
        .createSourceSelector(props, sparkContext.hadoopConfiguration());
  }

  @Override
  protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
    Pair<Option<String>, String> selectPathsWithMaxModificationTime =
        pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, 
lastCkptStr, sourceLimit);
    return selectPathsWithMaxModificationTime.getLeft()
        .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), 
selectPathsWithMaxModificationTime.getRight()))
        .orElseGet(() -> new InputBatch<>(Option.empty(), 
selectPathsWithMaxModificationTime.getRight()));
  }

  private JavaRDD<GenericRecord> fromFiles(String pathStr) {
    sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro data 
from files");
    JavaPairRDD<AvroKey, NullWritable> avroRDD = 
sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
        AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
    return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
  }
}
{code}
The {{schemaProvider}} parameter is completely ignored in the constructor, 
making \{{AvroKeyInputFormat }}always use writer schema to read.

As a result, we often see this from DeltaStream logs:
{code:java}
21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use 
AvroJob.setInputKeySchema() if desired.
21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the 
writer schema.
{code}
This [https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer] is a 
nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE schema 
evolution. For DFS data, I see this is the main blocker. If we pass the source 
schema from {{schemaProvider}}, we should be able to have the same  
BACKWARD_TRANSITIVE schema evolution feature for DFS avro data.

 

Suggested Fix: Pass the source schema from {{schemaProvider}} to hadoop 
configuration key 

{{avro.schema.input.key}}

 

 

  was:
Hi,

I am not sure if the AvroDFSSource is intended to ignore the source schema from 
designated schema provider class, but the current logic always uses the Avro 
writer schema as reader schema.

 Logic as of release-0.9.0, class {{org.apache.hudi.utilities.sources.Source}}
{code:java}
public final InputBatch<T> fetchNext(Option<String> lastCkptStr, long 
sourceLimit) {
  InputBatch<T> batch = fetchNewData(lastCkptStr, sourceLimit);
  // If overriddenSchemaProvider is passed in CLI, use it
  return overriddenSchemaProvider == null ? batch
      : new InputBatch<>(batch.getBatch(), batch.getCheckpointForNextBatch(), 
overriddenSchemaProvider);
}
{code}
 

Class: {{org.apache.hudi.utilities.sources.AvroDFSSource}}
{code:java}
public class AvroDFSSource extends AvroSource {

  private final DFSPathSelector pathSelector;

  public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
      SchemaProvider schemaProvider) throws IOException {
    super(props, sparkContext, sparkSession, schemaProvider);
    this.pathSelector = DFSPathSelector
        .createSourceSelector(props, sparkContext.hadoopConfiguration());
  }

  @Override
  protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
    Pair<Option<String>, String> selectPathsWithMaxModificationTime =
        pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, 
lastCkptStr, sourceLimit);
    return selectPathsWithMaxModificationTime.getLeft()
        .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), 
selectPathsWithMaxModificationTime.getRight()))
        .orElseGet(() -> new InputBatch<>(Option.empty(), 
selectPathsWithMaxModificationTime.getRight()));
  }

  private JavaRDD<GenericRecord> fromFiles(String pathStr) {
    sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro data 
from files");
    JavaPairRDD<AvroKey, NullWritable> avroRDD = 
sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
        AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
    return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
  }
}
{code}
The {{schemaProvider}} parameter is completely ignored in the constructor, 
making {{AvroKeyInputFormat }}always use writer schema to read.

As a result, we often see this from DeltaStream logs:
{code:java}
21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use 
AvroJob.setInputKeySchema() if desired.
21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the 
writer schema.
{code}
This [https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer] is a 
nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE schema 
evolution. For DFS data, I see this is the main blocker. If we pass the source 
schema from {{schemaProvider}}, we should be able to have the same  
BACKWARD_TRANSITIVE schema evolution feature for DFS avro data.

 

 


> AvroDFSSource does not use the overridden schema to deserialize Avro binaries.
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-2374
>                 URL: https://issues.apache.org/jira/browse/HUDI-2374
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer
>    Affects Versions: 0.9.0
>            Reporter: Xuan Huy Pham
>            Priority: Major
>              Labels: patch
>             Fix For: 0.10.0
>
>
> Hi,
> I am not sure if the AvroDFSSource is intended to ignore the source schema 
> from designated schema provider class, but the current logic always uses the 
> Avro writer schema as reader schema.
>  Logic as of release-0.9.0, class {{org.apache.hudi.utilities.sources.Source}}
> {code:java}
> public final InputBatch<T> fetchNext(Option<String> lastCkptStr, long 
> sourceLimit) {
>   InputBatch<T> batch = fetchNewData(lastCkptStr, sourceLimit);
>   // If overriddenSchemaProvider is passed in CLI, use it
>   return overriddenSchemaProvider == null ? batch
>       : new InputBatch<>(batch.getBatch(), batch.getCheckpointForNextBatch(), 
> overriddenSchemaProvider);
> }
> {code}
>  
> Class: {{org.apache.hudi.utilities.sources.AvroDFSSource}}
> {code:java}
> public class AvroDFSSource extends AvroSource {
>   private final DFSPathSelector pathSelector;
>   public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, 
> SparkSession sparkSession,
>       SchemaProvider schemaProvider) throws IOException {
>     super(props, sparkContext, sparkSession, schemaProvider);
>     this.pathSelector = DFSPathSelector
>         .createSourceSelector(props, sparkContext.hadoopConfiguration());
>   }
>   @Override
>   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
> lastCkptStr, long sourceLimit) {
>     Pair<Option<String>, String> selectPathsWithMaxModificationTime =
>         pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, 
> lastCkptStr, sourceLimit);
>     return selectPathsWithMaxModificationTime.getLeft()
>         .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), 
> selectPathsWithMaxModificationTime.getRight()))
>         .orElseGet(() -> new InputBatch<>(Option.empty(), 
> selectPathsWithMaxModificationTime.getRight()));
>   }
>   private JavaRDD<GenericRecord> fromFiles(String pathStr) {
>     sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro 
> data from files");
>     JavaPairRDD<AvroKey, NullWritable> avroRDD = 
> sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
>         AvroKey.class, NullWritable.class, 
> sparkContext.hadoopConfiguration());
>     return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
>   }
> }
> {code}
> The {{schemaProvider}} parameter is completely ignored in the constructor, 
> making \{{AvroKeyInputFormat }}always use writer schema to read.
> As a result, we often see this from DeltaStream logs:
> {code:java}
> 21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use 
> AvroJob.setInputKeySchema() if desired.
> 21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the 
> writer schema.
> {code}
> This [https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer] is a 
> nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE 
> schema evolution. For DFS data, I see this is the main blocker. If we pass 
> the source schema from {{schemaProvider}}, we should be able to have the same 
>  BACKWARD_TRANSITIVE schema evolution feature for DFS avro data.
>  
> Suggested Fix: Pass the source schema from {{schemaProvider}} to hadoop 
> configuration key 
> {{avro.schema.input.key}}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to