the-other-tim-brown commented on code in PR #9743:
URL: https://github.com/apache/hudi/pull/9743#discussion_r1349825399


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -661,6 +652,35 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
+  /**
+   * Apply schema reconcile and schema evolution rules(schema on read) and 
generate new target schema provider.
+   *
+   * @param incomingSchema schema of the source data
+   * @param sourceSchemaProvider Source schema provider.
+   * @return the SchemaProvider that can be used as writer schema.
+   */
+  private SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, 
SchemaProvider sourceSchemaProvider) {

Review Comment:
   There's another method in this class `getSchemaForWriteConfig` that does 
something that looks similar. Is that now obsolete?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -661,6 +652,35 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
+  /**
+   * Apply schema reconcile and schema evolution rules(schema on read) and 
generate new target schema provider.
+   *
+   * @param incomingSchema schema of the source data
+   * @param sourceSchemaProvider Source schema provider.
+   * @return the SchemaProvider that can be used as writer schema.
+   */
+  private SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, 
SchemaProvider sourceSchemaProvider) {
+    Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, 
cfg.targetBasePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf()))

Review Comment:
   On each commit, we'll be creating a new metaClient which requires scanning 
the `.hoodie` folder. Is there any way we can avoid that? If not, can we make 
sure we're reusing a single metaclient for a single sync round?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -661,6 +652,35 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
+  /**
+   * Apply schema reconcile and schema evolution rules(schema on read) and 
generate new target schema provider.
+   *
+   * @param incomingSchema schema of the source data
+   * @param sourceSchemaProvider Source schema provider.
+   * @return the SchemaProvider that can be used as writer schema.
+   */
+  private SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, 
SchemaProvider sourceSchemaProvider) {
+    Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, 
cfg.targetBasePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf()))
+        .setBasePath(cfg.targetBasePath)
+        .setPayloadClassName(cfg.payloadClassName)
+        .build();
+    Option<InternalSchema> internalSchemaOpt = 
HoodieConversionUtils.toJavaOption(
+        HoodieSchemaUtils.getLatestTableInternalSchema(
+            new HoodieConfig(HoodieStreamer.Config.getProps(fs, cfg)), 
metaClient));
+    // Deduce proper target (writer's) schema for the input dataset, 
reconciling its
+    // schema w/ the table's one
+    Schema targetSchema = HoodieSparkSqlWriter.deduceWriterSchema(
+          incomingSchema,
+          HoodieConversionUtils.toScalaOption(latestTableSchemaOpt),
+          HoodieConversionUtils.toScalaOption(internalSchemaOpt),
+          HoodieConversionUtils.fromProperties(props));
+
+    // Override schema provider with the reconciled target schema
+    return new DelegatingSchemaProvider(props, hoodieSparkContext.jsc(), 
sourceSchemaProvider,

Review Comment:
   if there is no change to schema, should we just return the original schema 
provider?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to