This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 407389ae9a2 [HUDI-6406] Pass in Spark Engine Context Wrapper for DeltaSync instead of spark engine context (#9008) 407389ae9a2 is described below commit 407389ae9a255e40c558ba14114bc7c3699f24a8 Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com> AuthorDate: Fri Jun 23 14:30:23 2023 -0700 [HUDI-6406] Pass in Spark Engine Context Wrapper for DeltaSync instead of spark engine context (#9008) Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local> --- .../client/common/HoodieFlinkEngineContext.java | 10 +++++ .../client/common/HoodieJavaEngineContext.java | 10 +++++ .../client/common/HoodieSparkEngineContext.java | 35 ++++++++++++++++-- .../apache/hudi/common/engine/EngineProperty.java | 3 +- .../hudi/common/engine/HoodieEngineContext.java | 4 ++ .../common/engine/HoodieLocalEngineContext.java | 10 +++++ .../deltastreamer/BaseErrorTableWriter.java | 4 +- .../hudi/utilities/deltastreamer/DeltaSync.java | 43 +++++++++++++--------- .../utilities/deltastreamer/ErrorTableUtils.java | 8 ++-- .../deltastreamer/HoodieDeltaStreamer.java | 42 ++++++++++++--------- .../utilities/sources/TestJsonKafkaSource.java | 5 ++- 11 files changed, 127 insertions(+), 47 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index db5c6ebd296..a62ca42d6b3 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -181,6 +181,16 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext { return Collections.emptyList(); } + @Override + public void cancelJob(String jobId) { + // no operation for now + } + + @Override + public void cancelAllJobs() { + // no operation for now + } + /** * Override the flink context supplier to return constant write token. */ diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index 6ab8e5ab029..5f6751b9961 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -161,4 +161,14 @@ public class HoodieJavaEngineContext extends HoodieEngineContext { public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) { return Collections.emptyList(); } + + @Override + public void cancelJob(String jobId) { + // no operation for now + } + + @Override + public void cancelAllJobs() { + // no operation for now + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java index c23a333f711..f3b87df040d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java @@ -37,6 +37,9 @@ import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.data.HoodieSparkLongAccumulator; import org.apache.hudi.exception.HoodieException; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.sql.SQLContext; @@ -79,6 +82,10 @@ public class HoodieSparkEngineContext extends HoodieEngineContext { return javaSparkContext; } + public JavaSparkContext jsc() { + return javaSparkContext; + } + public SQLContext getSqlContext() { return sqlContext; } @@ -165,9 +172,9 @@ public class HoodieSparkEngineContext extends HoodieEngineContext { @Override public void setProperty(EngineProperty key, String value) { - if (key == EngineProperty.COMPACTION_POOL_NAME) { - javaSparkContext.setLocalProperty("spark.scheduler.pool", value); - } else if (key == EngineProperty.CLUSTERING_POOL_NAME) { + if (key.equals(EngineProperty.COMPACTION_POOL_NAME) + || key.equals(EngineProperty.CLUSTERING_POOL_NAME) + || key.equals(EngineProperty.DELTASYNC_POOL_NAME)) { javaSparkContext.setLocalProperty("spark.scheduler.pool", value); } else { throw new HoodieException("Unknown engine property :" + key); @@ -211,4 +218,26 @@ public class HoodieSparkEngineContext extends HoodieEngineContext { return removed == null ? Collections.emptyList() : removed; } } + + @Override + public void cancelJob(String groupId) { + javaSparkContext.cancelJobGroup(groupId); + } + + @Override + public void cancelAllJobs() { + javaSparkContext.cancelAllJobs(); + } + + public SparkConf getConf() { + return javaSparkContext.getConf(); + } + + public Configuration hadoopConfiguration() { + return javaSparkContext.hadoopConfiguration(); + } + + public <T> JavaRDD<T> emptyRDD() { + return javaSparkContext.emptyRDD(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java index 36e7594937b..08467a6d193 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java @@ -31,5 +31,6 @@ public enum EngineProperty { // Amount of total memory available to each engine executor TOTAL_MEMORY_AVAILABLE, // Fraction of that memory, that is already in use by the engine - MEMORY_FRACTION_IN_USE + MEMORY_FRACTION_IN_USE, + DELTASYNC_POOL_NAME } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java index c123c279644..79d62d55770 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java @@ -101,4 +101,8 @@ public abstract class HoodieEngineContext { public abstract List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey); public abstract List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey); + + public abstract void cancelJob(String jobId); + + public abstract void cancelAllJobs(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java index 26190b790ca..5239490816d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -160,4 +160,14 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext { public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) { return Collections.emptyList(); } + + @Override + public void cancelJob(String jobId) { + // no operation for now + } + + @Override + public void cancelAllJobs() { + // no operation for now + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java index fea6bdb3cf3..e0bba0600ba 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.deltastreamer; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.util.Option; @@ -25,7 +26,6 @@ import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; /** @@ -44,7 +44,7 @@ public abstract class BaseErrorTableWriter<T extends ErrorEvent> { public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record"; public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, - TypedProperties props, JavaSparkContext jssc, FileSystem fs) { + TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs) { } /** diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 38582f0f8f4..19ca149154c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -209,9 +209,9 @@ public class DeltaSync implements Serializable, Closeable { private transient FileSystem fs; /** - * Spark context. + * Spark context Wrapper. */ - private transient JavaSparkContext jssc; + private final transient HoodieSparkEngineContext hoodieSparkContext; /** * Spark Session. @@ -278,12 +278,18 @@ public class DeltaSync implements Serializable, Closeable { private final boolean autoGenerateRecordKeys; + @Deprecated public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { + this(cfg, sparkSession, schemaProvider, props, new HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient); + } + public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, + TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, + Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { this.cfg = cfg; - this.jssc = jssc; + this.hoodieSparkContext = hoodieSparkContext; this.sparkSession = sparkSession; this.fs = fs; this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; @@ -306,11 +312,11 @@ public class DeltaSync implements Serializable, Closeable { } this.multiwriterIdentifier = StringUtils.isNullOrEmpty(id) ? Option.empty() : Option.of(id); if (props.getBoolean(ERROR_TABLE_ENABLED.key(),ERROR_TABLE_ENABLED.defaultValue())) { - this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, sparkSession, props, jssc, fs); + this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, sparkSession, props, hoodieSparkContext, fs); this.errorWriteFailureStrategy = ErrorTableUtils.getErrorWriteFailureStrategy(props); } this.formatAdapter = new SourceFormatAdapter( - UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics), + UtilHelpers.createSource(cfg.sourceClassName, props, hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics), this.errorTableWriter, Option.of(props)); this.transformer = UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames), @@ -399,7 +405,7 @@ public class DeltaSync implements Serializable, Closeable { Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue()))) .setUrlEncodePartitioning(props.getBoolean(URL_ENCODE_PARTITIONING.key(), Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue()))) - .initTable(new Configuration(jssc.hadoopConfiguration()), + .initTable(new Configuration(hoodieSparkContext.hadoopConfiguration()), cfg.targetBasePath); } @@ -539,7 +545,7 @@ public class DeltaSync implements Serializable, Closeable { formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit); Option<Dataset<Row>> transformed = - dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props)); + dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(hoodieSparkContext.jsc(), sparkSession, data, props)); transformed = formatAdapter.processErrorEvents(transformed, ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE); @@ -571,7 +577,7 @@ public class DeltaSync implements Serializable, Closeable { } schemaProvider = this.userProvidedSchemaProvider; } else { - Option<Schema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath); + Option<Schema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, cfg.targetBasePath); // Deduce proper target (writer's) schema for the transformed dataset, reconciling its // schema w/ the table's one Option<Schema> targetSchemaOpt = transformed.map(df -> { @@ -587,8 +593,8 @@ public class DeltaSync implements Serializable, Closeable { }); // Override schema provider with the reconciled target schema schemaProvider = targetSchemaOpt.map(targetSchema -> - (SchemaProvider) new DelegatingSchemaProvider(props, jssc, dataAndCheckpoint.getSchemaProvider(), - new SimpleSchemaProvider(jssc, targetSchema, props))) + (SchemaProvider) new DelegatingSchemaProvider(props, hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(), + new SimpleSchemaProvider(hoodieSparkContext.jsc(), targetSchema, props))) .orElse(dataAndCheckpoint.getSchemaProvider()); // Rewrite transformed records into the expected target schema avroRDDOptional = transformed.map(t -> getTransformedRDD(t, reconcileSchema, schemaProvider.getTargetSchema())); @@ -610,10 +616,10 @@ public class DeltaSync implements Serializable, Closeable { return null; } - jssc.setJobGroup(this.getClass().getSimpleName(), "Checking if input is empty"); + hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty"); if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { LOG.info("No new data, perform empty commit."); - return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD())); + return Pair.of(schemaProvider, Pair.of(checkpointStr, hoodieSparkContext.emptyRDD())); } boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); @@ -786,7 +792,7 @@ public class DeltaSync implements Serializable, Closeable { Option<String> scheduledCompactionInstant = Option.empty(); // filter dupes if needed if (cfg.filterDupes) { - records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig()); + records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), records, writeClient.getConfig()); } boolean isEmpty = records.isEmpty(); @@ -953,7 +959,7 @@ public class DeltaSync implements Serializable, Closeable { LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward compatibility"); } if (cfg.enableMetaSync) { - FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()); + FileSystem fs = FSUtils.getFs(cfg.targetBasePath, hoodieSparkContext.hadoopConfiguration()); TypedProperties metaProps = new TypedProperties(); metaProps.putAll(props); @@ -1002,12 +1008,12 @@ public class DeltaSync implements Serializable, Closeable { registerAvroSchemas(sourceSchema, targetSchema); final HoodieWriteConfig initialWriteConfig = getHoodieClientConfig(targetSchema); final HoodieWriteConfig writeConfig = SparkSampleWritesUtils - .getWriteConfigWithRecordSizeEstimate(jssc, records, initialWriteConfig) + .getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), records, initialWriteConfig) .orElse(initialWriteConfig); if (writeConfig.isEmbeddedTimelineServerEnabled()) { if (!embeddedTimelineService.isPresent()) { - embeddedTimelineService = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new HoodieSparkEngineContext(jssc), writeConfig); + embeddedTimelineService = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(hoodieSparkContext, writeConfig); } else { EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(), writeConfig); } @@ -1017,7 +1023,7 @@ public class DeltaSync implements Serializable, Closeable { // Close Write client. writeClient.close(); } - writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), writeConfig, embeddedTimelineService); + writeClient = new SparkRDDWriteClient<>(hoodieSparkContext, writeConfig, embeddedTimelineService); onInitializingHoodieWriteClient.apply(writeClient); } @@ -1155,7 +1161,8 @@ public class DeltaSync implements Serializable, Closeable { if (LOG.isDebugEnabled()) { LOG.debug("Registering Schema: " + schemas); } - jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); + // Use the underlying spark context in case the java context is changed during runtime + hoodieSparkContext.getJavaSparkContext().sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java index 76e7b030b6f..95400021ce7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.deltastreamer; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -29,7 +30,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.exception.HoodieValidationException; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -45,19 +45,19 @@ import static org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR public final class ErrorTableUtils { public static Option<BaseErrorTableWriter> getErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, - TypedProperties props, JavaSparkContext jssc, FileSystem fs) { + TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs) { String errorTableWriterClass = props.getString(ERROR_TABLE_WRITE_CLASS.key()); ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass), "Missing error table config " + ERROR_TABLE_WRITE_CLASS); Class<?>[] argClassArr = new Class[]{HoodieDeltaStreamer.Config.class, - SparkSession.class, TypedProperties.class, JavaSparkContext.class, FileSystem.class}; + SparkSession.class, TypedProperties.class, HoodieSparkEngineContext.class, FileSystem.class}; String errMsg = "Unable to instantiate ErrorTableWriter with arguments type " + Arrays.toString(argClassArr); ValidationUtils.checkArgument(ReflectionUtils.hasConstructor(BaseErrorTableWriter.class.getName(), argClassArr, false), errMsg); try { return Option.of((BaseErrorTableWriter) ReflectionUtils.getClass(errorTableWriterClass).getConstructor(argClassArr) - .newInstance(cfg, sparkSession, props, jssc, fs)); + .newInstance(cfg, sparkSession, props, hoodieSparkContext, fs)); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new HoodieException(errMsg, e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 0ca939f08dd..f0631e3ef76 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; @@ -162,8 +163,10 @@ public class HoodieDeltaStreamer implements Serializable { this.cfg = cfg; this.bootstrapExecutor = Option.ofNullable( cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null); + + HoodieSparkEngineContext sparkEngineContext = new HoodieSparkEngineContext(jssc); this.ingestionService = Option.ofNullable( - cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, Option.ofNullable(this.properties))); + cfg.runBootstrap ? null : new DeltaSyncService(cfg, sparkEngineContext, fs, conf, Option.ofNullable(this.properties))); } private static TypedProperties combineProperties(Config cfg, Option<TypedProperties> propsOverride, Configuration hadoopConf) { @@ -616,9 +619,9 @@ public class HoodieDeltaStreamer implements Serializable { private transient SparkSession sparkSession; /** - * Spark context. + * Spark context Wrapper. */ - private transient JavaSparkContext jssc; + private final transient HoodieSparkEngineContext hoodieSparkContext; private transient FileSystem fs; @@ -653,16 +656,16 @@ public class HoodieDeltaStreamer implements Serializable { private final Option<ConfigurationHotUpdateStrategy> configurationHotUpdateStrategyOpt; - public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, + public DeltaSyncService(Config cfg, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, Option<TypedProperties> properties) throws IOException { super(HoodieIngestionConfig.newBuilder() .isContinuous(cfg.continuousMode) .withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build()); this.cfg = cfg; - this.jssc = jssc; + this.hoodieSparkContext = hoodieSparkContext; this.fs = fs; this.hiveConf = conf; - this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); + this.sparkSession = SparkSession.builder().config(hoodieSparkContext.getConf()).getOrCreate(); this.asyncCompactService = Option.empty(); this.asyncClusteringService = Option.empty(); this.postWriteTerminationStrategy = StringUtils.isNullOrEmpty(cfg.postWriteTerminationStrategyClass) ? Option.empty() : @@ -706,15 +709,15 @@ public class HoodieDeltaStreamer implements Serializable { LOG.info(toSortedTruncatedString(props)); this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor( - UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc, cfg.transformerClassNames); + UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, hoodieSparkContext.jsc()), + props, hoodieSparkContext.jsc(), cfg.transformerClassNames); - deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, - this::onInitializingWriteClient); + deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs, conf, this::onInitializingWriteClient); } - public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) + public DeltaSyncService(HoodieDeltaStreamer.Config cfg, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf) throws IOException { - this(cfg, jssc, fs, conf, Option.empty()); + this(cfg, hoodieSparkContext, fs, conf, Option.empty()); } private void initializeTableTypeAndBaseFileFormat() { @@ -728,7 +731,7 @@ public class HoodieDeltaStreamer implements Serializable { if (deltaSync != null) { deltaSync.close(); } - deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf, this::onInitializingWriteClient); + deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient); } @Override @@ -739,7 +742,7 @@ public class HoodieDeltaStreamer implements Serializable { if (cfg.isAsyncCompactionEnabled()) { // set Scheduler Pool. LOG.info("Setting Spark Pool name for delta-sync to " + DELTASYNC_POOL_NAME); - jssc.setLocalProperty("spark.scheduler.pool", DELTASYNC_POOL_NAME); + hoodieSparkContext.setProperty(EngineProperty.DELTASYNC_POOL_NAME, DELTASYNC_POOL_NAME); } HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props); @@ -867,10 +870,10 @@ public class HoodieDeltaStreamer implements Serializable { // Update the write client used by Async Compactor. asyncCompactService.get().updateWriteClient(writeClient); } else { - asyncCompactService = Option.ofNullable(new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient)); + asyncCompactService = Option.ofNullable(new SparkAsyncCompactService(hoodieSparkContext, writeClient)); // Enqueue existing pending compactions first HoodieTableMetaClient meta = - HoodieTableMetaClient.builder().setConf(new Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTableMetaClient.builder().setConf(new Configuration(hoodieSparkContext.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build(); List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta); pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); asyncCompactService.get().start(error -> true); @@ -889,9 +892,9 @@ public class HoodieDeltaStreamer implements Serializable { if (asyncClusteringService.isPresent()) { asyncClusteringService.get().updateWriteClient(writeClient); } else { - asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(new HoodieSparkEngineContext(jssc), writeClient)); + asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(hoodieSparkContext, writeClient)); HoodieTableMetaClient meta = HoodieTableMetaClient.builder() - .setConf(new Configuration(jssc.hadoopConfiguration())) + .setConf(new Configuration(hoodieSparkContext.hadoopConfiguration())) .setBasePath(cfg.targetBasePath) .setLoadActiveTimelineOnLoad(true).build(); List<HoodieInstant> pending = ClusteringUtils.getPendingClusteringInstantTimes(meta); @@ -942,6 +945,11 @@ public class HoodieDeltaStreamer implements Serializable { return props; } + @VisibleForTesting + public HoodieSparkEngineContext getHoodieSparkContext() { + return hoodieSparkContext; + } + @VisibleForTesting public DeltaSync getDeltaSync() { return deltaSync; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 072e20c4c47..ad71eb51526 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.sources; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; @@ -286,7 +287,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties props) { return new BaseErrorTableWriter<ErrorEvent<String>>(new HoodieDeltaStreamer.Config(), - spark(), props, jsc(), fs()) { + spark(), props, new HoodieSparkEngineContext(jsc()), fs()) { List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList(); @Override @@ -305,7 +306,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { } }; } - + @Test public void testAppendKafkaOffset() { final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";