yihua commented on a change in pull request #3741:
URL: https://github.com/apache/hudi/pull/3741#discussion_r719765696



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
##########
@@ -19,82 +19,36 @@
 package org.apache.hudi.table.action.compact;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieCompactionOperation;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.io.IOUtils;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.util.AccumulatorV2;
-import org.apache.spark.util.LongAccumulator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-import static java.util.stream.Collectors.toList;
 
 /**
  * Compacts a hoodie table with merge on read storage. Computes all possible 
compactions,
  * passes it through a CompactionFilter and executes all the compactions and 
writes a new version of base files and make
  * a normal commit
- *
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class HoodieSparkMergeOnReadTableCompactor<T extends 
HoodieRecordPayload> implements HoodieCompactor<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+public class HoodieSparkMergeOnReadTableCompactor<T extends 
HoodieRecordPayload>
+    extends HoodieCompactor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> {
 
-  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkMergeOnReadTableCompactor.class);
-  // Accumulator to keep track of total log files for a table
-  private AccumulatorV2<Long, Long> totalLogFiles;
-  // Accumulator to keep track of total log file slices for a table
-  private AccumulatorV2<Long, Long> totalFileSlices;
+  @Override
+  public Schema getReaderSchema(HoodieWriteConfig config) {
+    return HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
+  }
 
   @Override
-  public JavaRDD<WriteStatus> compact(HoodieEngineContext context, 
HoodieCompactionPlan compactionPlan,
-                                      HoodieTable hoodieTable, 
HoodieWriteConfig config, String compactionInstantTime) throws IOException {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    if (compactionPlan == null || (compactionPlan.getOperations() == null)
-        || (compactionPlan.getOperations().isEmpty())) {
-      return jsc.emptyRDD();
-    }
-    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+  public void updateReaderSchema(HoodieWriteConfig config, 
HoodieTableMetaClient metaClient) {
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);

Review comment:
       @danny0405 @garyli1019 I extract the divergence between Spark and Flink 
around reader schema into these methods.  I see that Flink skips the actual 
`updateReaderSchema()` logic which Spark does.  Is this intentional for Flink?
   
   @nsivabalan also raised the same comment: 
https://github.com/apache/hudi/pull/3727#discussion_r719331449




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to