This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 407389ae9a2 [HUDI-6406] Pass in Spark Engine Context Wrapper for 
DeltaSync instead of spark engine context (#9008)
407389ae9a2 is described below

commit 407389ae9a255e40c558ba14114bc7c3699f24a8
Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com>
AuthorDate: Fri Jun 23 14:30:23 2023 -0700

    [HUDI-6406] Pass in Spark Engine Context Wrapper for DeltaSync instead of 
spark engine context (#9008)
    
    Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>
---
 .../client/common/HoodieFlinkEngineContext.java    | 10 +++++
 .../client/common/HoodieJavaEngineContext.java     | 10 +++++
 .../client/common/HoodieSparkEngineContext.java    | 35 ++++++++++++++++--
 .../apache/hudi/common/engine/EngineProperty.java  |  3 +-
 .../hudi/common/engine/HoodieEngineContext.java    |  4 ++
 .../common/engine/HoodieLocalEngineContext.java    | 10 +++++
 .../deltastreamer/BaseErrorTableWriter.java        |  4 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 43 +++++++++++++---------
 .../utilities/deltastreamer/ErrorTableUtils.java   |  8 ++--
 .../deltastreamer/HoodieDeltaStreamer.java         | 42 ++++++++++++---------
 .../utilities/sources/TestJsonKafkaSource.java     |  5 ++-
 11 files changed, 127 insertions(+), 47 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index db5c6ebd296..a62ca42d6b3 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -181,6 +181,16 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
     return Collections.emptyList();
   }
 
+  @Override
+  public void cancelJob(String jobId) {
+    // no operation for now
+  }
+
+  @Override
+  public void cancelAllJobs() {
+    // no operation for now
+  }
+
   /**
    * Override the flink context supplier to return constant write token.
    */
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 6ab8e5ab029..5f6751b9961 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -161,4 +161,14 @@ public class HoodieJavaEngineContext extends 
HoodieEngineContext {
   public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
     return Collections.emptyList();
   }
+
+  @Override
+  public void cancelJob(String jobId) {
+    // no operation for now
+  }
+
+  @Override
+  public void cancelAllJobs() {
+    // no operation for now
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index c23a333f711..f3b87df040d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -37,6 +37,9 @@ import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.data.HoodieSparkLongAccumulator;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.sql.SQLContext;
@@ -79,6 +82,10 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
     return javaSparkContext;
   }
 
+  public JavaSparkContext jsc() {
+    return javaSparkContext;
+  }
+
   public SQLContext getSqlContext() {
     return sqlContext;
   }
@@ -165,9 +172,9 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
 
   @Override
   public void setProperty(EngineProperty key, String value) {
-    if (key == EngineProperty.COMPACTION_POOL_NAME) {
-      javaSparkContext.setLocalProperty("spark.scheduler.pool", value);
-    } else if (key == EngineProperty.CLUSTERING_POOL_NAME) {
+    if (key.equals(EngineProperty.COMPACTION_POOL_NAME)
+        || key.equals(EngineProperty.CLUSTERING_POOL_NAME)
+        || key.equals(EngineProperty.DELTASYNC_POOL_NAME)) {
       javaSparkContext.setLocalProperty("spark.scheduler.pool", value);
     } else {
       throw new HoodieException("Unknown engine property :" + key);
@@ -211,4 +218,26 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
       return removed == null ? Collections.emptyList() : removed;
     }
   }
+
+  @Override
+  public void cancelJob(String groupId) {
+    javaSparkContext.cancelJobGroup(groupId);
+  }
+
+  @Override
+  public void cancelAllJobs() {
+    javaSparkContext.cancelAllJobs();
+  }
+
+  public SparkConf getConf() {
+    return javaSparkContext.getConf();
+  }
+
+  public Configuration hadoopConfiguration() {
+    return javaSparkContext.hadoopConfiguration();
+  }
+
+  public <T> JavaRDD<T> emptyRDD() {
+    return javaSparkContext.emptyRDD();
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java
index 36e7594937b..08467a6d193 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java
@@ -31,5 +31,6 @@ public enum EngineProperty {
   // Amount of total memory available to each engine executor
   TOTAL_MEMORY_AVAILABLE,
   // Fraction of that memory, that is already in use by the engine
-  MEMORY_FRACTION_IN_USE
+  MEMORY_FRACTION_IN_USE,
+  DELTASYNC_POOL_NAME
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index c123c279644..79d62d55770 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -101,4 +101,8 @@ public abstract class HoodieEngineContext {
   public abstract List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey);
 
   public abstract List<Integer> removeCachedDataIds(HoodieDataCacheKey 
cacheKey);
+
+  public abstract void cancelJob(String jobId);
+
+  public abstract void cancelAllJobs();
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 26190b790ca..5239490816d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -160,4 +160,14 @@ public final class HoodieLocalEngineContext extends 
HoodieEngineContext {
   public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
     return Collections.emptyList();
   }
+
+  @Override
+  public void cancelJob(String jobId) {
+    // no operation for now
+  }
+
+  @Override
+  public void cancelAllJobs() {
+    // no operation for now
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
index fea6bdb3cf3..e0bba0600ba 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.util.Option;
@@ -25,7 +26,6 @@ import org.apache.hudi.common.util.VisibleForTesting;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 
 /**
@@ -44,7 +44,7 @@ public abstract class BaseErrorTableWriter<T extends 
ErrorEvent> {
   public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
 
   public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession 
sparkSession,
-                                   TypedProperties props, JavaSparkContext 
jssc, FileSystem fs) {
+                              TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs) {
   }
 
   /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 38582f0f8f4..19ca149154c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -209,9 +209,9 @@ public class DeltaSync implements Serializable, Closeable {
   private transient FileSystem fs;
 
   /**
-   * Spark context.
+   * Spark context Wrapper.
    */
-  private transient JavaSparkContext jssc;
+  private final transient HoodieSparkEngineContext hoodieSparkContext;
 
   /**
    * Spark Session.
@@ -278,12 +278,18 @@ public class DeltaSync implements Serializable, Closeable 
{
 
   private final boolean autoGenerateRecordKeys;
 
+  @Deprecated
   public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                    TypedProperties props, JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
+    this(cfg, sparkSession, schemaProvider, props, new 
HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient);
+  }
 
+  public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
+                   TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
+                   Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
     this.cfg = cfg;
-    this.jssc = jssc;
+    this.hoodieSparkContext = hoodieSparkContext;
     this.sparkSession = sparkSession;
     this.fs = fs;
     this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
@@ -306,11 +312,11 @@ public class DeltaSync implements Serializable, Closeable 
{
     }
     this.multiwriterIdentifier = StringUtils.isNullOrEmpty(id) ? 
Option.empty() : Option.of(id);
     if 
(props.getBoolean(ERROR_TABLE_ENABLED.key(),ERROR_TABLE_ENABLED.defaultValue()))
 {
-      this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, 
sparkSession, props, jssc, fs);
+      this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, 
sparkSession, props, hoodieSparkContext, fs);
       this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
     }
     this.formatAdapter = new SourceFormatAdapter(
-        UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider, metrics),
+        UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics),
         this.errorTableWriter, Option.of(props));
 
     this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
@@ -399,7 +405,7 @@ public class DeltaSync implements Serializable, Closeable {
             
Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())))
         
.setUrlEncodePartitioning(props.getBoolean(URL_ENCODE_PARTITIONING.key(),
             Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
-        .initTable(new Configuration(jssc.hadoopConfiguration()),
+        .initTable(new Configuration(hoodieSparkContext.hadoopConfiguration()),
             cfg.targetBasePath);
   }
 
@@ -539,7 +545,7 @@ public class DeltaSync implements Serializable, Closeable {
           formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, 
cfg.sourceLimit);
 
       Option<Dataset<Row>> transformed =
-          dataAndCheckpoint.getBatch().map(data -> 
transformer.get().apply(jssc, sparkSession, data, props));
+          dataAndCheckpoint.getBatch().map(data -> 
transformer.get().apply(hoodieSparkContext.jsc(), sparkSession, data, props));
 
       transformed = formatAdapter.processErrorEvents(transformed,
           ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
@@ -571,7 +577,7 @@ public class DeltaSync implements Serializable, Closeable {
         }
         schemaProvider = this.userProvidedSchemaProvider;
       } else {
-        Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+        Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, 
cfg.targetBasePath);
         // Deduce proper target (writer's) schema for the transformed dataset, 
reconciling its
         // schema w/ the table's one
         Option<Schema> targetSchemaOpt = transformed.map(df -> {
@@ -587,8 +593,8 @@ public class DeltaSync implements Serializable, Closeable {
         });
         // Override schema provider with the reconciled target schema
         schemaProvider = targetSchemaOpt.map(targetSchema ->
-          (SchemaProvider) new DelegatingSchemaProvider(props, jssc, 
dataAndCheckpoint.getSchemaProvider(),
-                new SimpleSchemaProvider(jssc, targetSchema, props)))
+          (SchemaProvider) new DelegatingSchemaProvider(props, 
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
+                                                        new 
SimpleSchemaProvider(hoodieSparkContext.jsc(), targetSchema, props)))
           .orElse(dataAndCheckpoint.getSchemaProvider());
         // Rewrite transformed records into the expected target schema
         avroRDDOptional = transformed.map(t -> getTransformedRDD(t, 
reconcileSchema, schemaProvider.getTargetSchema()));
@@ -610,10 +616,10 @@ public class DeltaSync implements Serializable, Closeable 
{
       return null;
     }
 
-    jssc.setJobGroup(this.getClass().getSimpleName(), "Checking if input is 
empty");
+    hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking 
if input is empty");
     if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
       LOG.info("No new data, perform empty commit.");
-      return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
+      return Pair.of(schemaProvider, Pair.of(checkpointStr, 
hoodieSparkContext.emptyRDD()));
     }
 
     boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
@@ -786,7 +792,7 @@ public class DeltaSync implements Serializable, Closeable {
     Option<String> scheduledCompactionInstant = Option.empty();
     // filter dupes if needed
     if (cfg.filterDupes) {
-      records = DataSourceUtils.dropDuplicates(jssc, records, 
writeClient.getConfig());
+      records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), 
records, writeClient.getConfig());
     }
 
     boolean isEmpty = records.isEmpty();
@@ -953,7 +959,7 @@ public class DeltaSync implements Serializable, Closeable {
       LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward 
compatibility");
     }
     if (cfg.enableMetaSync) {
-      FileSystem fs = FSUtils.getFs(cfg.targetBasePath, 
jssc.hadoopConfiguration());
+      FileSystem fs = FSUtils.getFs(cfg.targetBasePath, 
hoodieSparkContext.hadoopConfiguration());
 
       TypedProperties metaProps = new TypedProperties();
       metaProps.putAll(props);
@@ -1002,12 +1008,12 @@ public class DeltaSync implements Serializable, 
Closeable {
     registerAvroSchemas(sourceSchema, targetSchema);
     final HoodieWriteConfig initialWriteConfig = 
getHoodieClientConfig(targetSchema);
     final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
-        .getWriteConfigWithRecordSizeEstimate(jssc, records, 
initialWriteConfig)
+        .getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), 
records, initialWriteConfig)
         .orElse(initialWriteConfig);
 
     if (writeConfig.isEmbeddedTimelineServerEnabled()) {
       if (!embeddedTimelineService.isPresent()) {
-        embeddedTimelineService = 
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new 
HoodieSparkEngineContext(jssc), writeConfig);
+        embeddedTimelineService = 
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(hoodieSparkContext, 
writeConfig);
       } else {
         
EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(),
 writeConfig);
       }
@@ -1017,7 +1023,7 @@ public class DeltaSync implements Serializable, Closeable 
{
       // Close Write client.
       writeClient.close();
     }
-    writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(jssc), writeConfig, embeddedTimelineService);
+    writeClient = new SparkRDDWriteClient<>(hoodieSparkContext, writeConfig, 
embeddedTimelineService);
     onInitializingHoodieWriteClient.apply(writeClient);
   }
 
@@ -1155,7 +1161,8 @@ public class DeltaSync implements Serializable, Closeable 
{
       if (LOG.isDebugEnabled()) {
         LOG.debug("Registering Schema: " + schemas);
       }
-      
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
+      // Use the underlying spark context in case the java context is changed 
during runtime
+      
hoodieSparkContext.getJavaSparkContext().sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
index 76e7b030b6f..95400021ce7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -29,7 +30,6 @@ import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -45,19 +45,19 @@ import static 
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR
 
 public final class ErrorTableUtils {
   public static Option<BaseErrorTableWriter> 
getErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
-                                                                 
TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+                                                                 
TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem 
fs) {
     String errorTableWriterClass = 
props.getString(ERROR_TABLE_WRITE_CLASS.key());
     
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
         "Missing error table config " + ERROR_TABLE_WRITE_CLASS);
 
     Class<?>[] argClassArr = new Class[]{HoodieDeltaStreamer.Config.class,
-        SparkSession.class, TypedProperties.class, JavaSparkContext.class, 
FileSystem.class};
+        SparkSession.class, TypedProperties.class, 
HoodieSparkEngineContext.class, FileSystem.class};
     String errMsg = "Unable to instantiate ErrorTableWriter with arguments 
type " + Arrays.toString(argClassArr);
     
ValidationUtils.checkArgument(ReflectionUtils.hasConstructor(BaseErrorTableWriter.class.getName(),
 argClassArr, false), errMsg);
 
     try {
       return Option.of((BaseErrorTableWriter) 
ReflectionUtils.getClass(errorTableWriterClass).getConstructor(argClassArr)
-          .newInstance(cfg, sparkSession, props, jssc, fs));
+          .newInstance(cfg, sparkSession, props, hoodieSparkContext, fs));
     } catch (NoSuchMethodException | InvocationTargetException | 
InstantiationException | IllegalAccessException e) {
       throw new HoodieException(errMsg, e);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 0ca939f08dd..f0631e3ef76 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -32,6 +32,7 @@ import 
org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -162,8 +163,10 @@ public class HoodieDeltaStreamer implements Serializable {
     this.cfg = cfg;
     this.bootstrapExecutor = Option.ofNullable(
         cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, 
this.properties) : null);
+
+    HoodieSparkEngineContext sparkEngineContext = new 
HoodieSparkEngineContext(jssc);
     this.ingestionService = Option.ofNullable(
-        cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, 
Option.ofNullable(this.properties)));
+        cfg.runBootstrap ? null : new DeltaSyncService(cfg, 
sparkEngineContext, fs, conf, Option.ofNullable(this.properties)));
   }
 
   private static TypedProperties combineProperties(Config cfg, 
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
@@ -616,9 +619,9 @@ public class HoodieDeltaStreamer implements Serializable {
     private transient SparkSession sparkSession;
 
     /**
-     * Spark context.
+     * Spark context Wrapper.
      */
-    private transient JavaSparkContext jssc;
+    private final transient HoodieSparkEngineContext hoodieSparkContext;
 
     private transient FileSystem fs;
 
@@ -653,16 +656,16 @@ public class HoodieDeltaStreamer implements Serializable {
 
     private final Option<ConfigurationHotUpdateStrategy> 
configurationHotUpdateStrategyOpt;
 
-    public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf,
+    public DeltaSyncService(Config cfg, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
                             Option<TypedProperties> properties) throws 
IOException {
       super(HoodieIngestionConfig.newBuilder()
           .isContinuous(cfg.continuousMode)
           .withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
       this.cfg = cfg;
-      this.jssc = jssc;
+      this.hoodieSparkContext = hoodieSparkContext;
       this.fs = fs;
       this.hiveConf = conf;
-      this.sparkSession = 
SparkSession.builder().config(jssc.getConf()).getOrCreate();
+      this.sparkSession = 
SparkSession.builder().config(hoodieSparkContext.getConf()).getOrCreate();
       this.asyncCompactService = Option.empty();
       this.asyncClusteringService = Option.empty();
       this.postWriteTerminationStrategy = 
StringUtils.isNullOrEmpty(cfg.postWriteTerminationStrategyClass) ? 
Option.empty() :
@@ -706,15 +709,15 @@ public class HoodieDeltaStreamer implements Serializable {
       LOG.info(toSortedTruncatedString(props));
 
       this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
-          UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, 
jssc), props, jssc, cfg.transformerClassNames);
+          UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, 
hoodieSparkContext.jsc()),
+          props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
 
-      deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, 
jssc, fs, conf,
-          this::onInitializingWriteClient);
+      deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, 
hoodieSparkContext, fs, conf, this::onInitializingWriteClient);
     }
 
-    public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext 
jssc, FileSystem fs, Configuration conf)
+    public DeltaSyncService(HoodieDeltaStreamer.Config cfg, 
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf)
         throws IOException {
-      this(cfg, jssc, fs, conf, Option.empty());
+      this(cfg, hoodieSparkContext, fs, conf, Option.empty());
     }
 
     private void initializeTableTypeAndBaseFileFormat() {
@@ -728,7 +731,7 @@ public class HoodieDeltaStreamer implements Serializable {
       if (deltaSync != null) {
         deltaSync.close();
       }
-      deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, 
jssc, fs, hiveConf, this::onInitializingWriteClient);
+      deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, 
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient);
     }
 
     @Override
@@ -739,7 +742,7 @@ public class HoodieDeltaStreamer implements Serializable {
         if (cfg.isAsyncCompactionEnabled()) {
           // set Scheduler Pool.
           LOG.info("Setting Spark Pool name for delta-sync to " + 
DELTASYNC_POOL_NAME);
-          jssc.setLocalProperty("spark.scheduler.pool", DELTASYNC_POOL_NAME);
+          hoodieSparkContext.setProperty(EngineProperty.DELTASYNC_POOL_NAME, 
DELTASYNC_POOL_NAME);
         }
 
         HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.from(props);
@@ -867,10 +870,10 @@ public class HoodieDeltaStreamer implements Serializable {
           // Update the write client used by Async Compactor.
           asyncCompactService.get().updateWriteClient(writeClient);
         } else {
-          asyncCompactService = Option.ofNullable(new 
SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient));
+          asyncCompactService = Option.ofNullable(new 
SparkAsyncCompactService(hoodieSparkContext, writeClient));
           // Enqueue existing pending compactions first
           HoodieTableMetaClient meta =
-              HoodieTableMetaClient.builder().setConf(new 
Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
+              HoodieTableMetaClient.builder().setConf(new 
Configuration(hoodieSparkContext.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
           List<HoodieInstant> pending = 
CompactionUtils.getPendingCompactionInstantTimes(meta);
           pending.forEach(hoodieInstant -> 
asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
           asyncCompactService.get().start(error -> true);
@@ -889,9 +892,9 @@ public class HoodieDeltaStreamer implements Serializable {
         if (asyncClusteringService.isPresent()) {
           asyncClusteringService.get().updateWriteClient(writeClient);
         } else {
-          asyncClusteringService = Option.ofNullable(new 
SparkAsyncClusteringService(new HoodieSparkEngineContext(jssc), writeClient));
+          asyncClusteringService = Option.ofNullable(new 
SparkAsyncClusteringService(hoodieSparkContext, writeClient));
           HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
-              .setConf(new Configuration(jssc.hadoopConfiguration()))
+              .setConf(new 
Configuration(hoodieSparkContext.hadoopConfiguration()))
               .setBasePath(cfg.targetBasePath)
               .setLoadActiveTimelineOnLoad(true).build();
           List<HoodieInstant> pending = 
ClusteringUtils.getPendingClusteringInstantTimes(meta);
@@ -942,6 +945,11 @@ public class HoodieDeltaStreamer implements Serializable {
       return props;
     }
 
+    @VisibleForTesting
+    public HoodieSparkEngineContext getHoodieSparkContext() {
+      return hoodieSparkContext;
+    }
+
     @VisibleForTesting
     public DeltaSync getDeltaSync() {
       return deltaSync;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 072e20c4c47..ad71eb51526 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.sources;
 
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -286,7 +287,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
 
   private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties 
props) {
     return new BaseErrorTableWriter<ErrorEvent<String>>(new 
HoodieDeltaStreamer.Config(),
-        spark(), props, jsc(), fs()) {
+        spark(), props, new HoodieSparkEngineContext(jsc()), fs()) {
       List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList();
 
       @Override
@@ -305,7 +306,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
       }
     };
   }
-  
+
   @Test
   public void testAppendKafkaOffset() {
     final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";

Reply via email to