This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 63bf8dcc8cf Centralize batch ingestion job spec constants into 
SegmentGenerationJobUtils (#18413)
63bf8dcc8cf is described below

commit 63bf8dcc8cfe71cc318673e398b98d938c400833
Author: Akanksha kedia <[email protected]>
AuthorDate: Tue Jun 23 13:07:59 2026 +0530

    Centralize batch ingestion job spec constants into 
SegmentGenerationJobUtils (#18413)
    
    Move duplicated string constants (SEGMENT_GENERATION_JOB_SPEC,
    DEPENDENCY_JAR_DIR, STAGING_DIR) from HadoopSegmentGenerationJobRunner
    and SparkSegmentGenerationJobRunner into the shared
    SegmentGenerationJobUtils class in pinot-batch-ingestion-common.
    
    This eliminates silent drift between the Hadoop and Spark runners,
    where the same config keys were independently declared as private
    literals. The Hadoop runner's public SEGMENT_GENERATION_JOB_SPEC
    field is retained as a delegating alias for backward compatibility.
    
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../ingestion/batch/common/SegmentGenerationJobUtils.java |  7 +++++++
 .../batch/hadoop/HadoopSegmentGenerationJobRunner.java    | 15 +++++++--------
 .../batch/spark3/SparkSegmentGenerationJobRunner.java     | 10 +++++-----
 3 files changed, 19 insertions(+), 13 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
index 816bef6232e..1129bcce128 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
@@ -47,6 +47,13 @@ public class SegmentGenerationJobUtils implements 
Serializable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGenerationJobUtils.class);
 
+  // Key used to pass the serialized SegmentGenerationJobSpec through a 
distributed job framework
+  public static final String SEGMENT_GENERATION_JOB_SPEC = 
"segmentGenerationJobSpec";
+
+  // Field names in the executionFrameworkSpec/extraConfigs section shared 
across ingestion frameworks
+  public static final String DEPENDENCY_JAR_DIR = "dependencyJarDir";
+  public static final String STAGING_DIR = "stagingDir";
+
   /**
    * Always use local directory sequence id unless explicitly config: 
"use.global.directory.sequence.id".
    *
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 918ecb99da6..de173ff93d9 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -67,13 +67,10 @@ import static 
org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY
 public class HadoopSegmentGenerationJobRunner extends Configured implements 
IngestionJobRunner, Serializable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopSegmentGenerationJobRunner.class);
 
-  public static final String SEGMENT_GENERATION_JOB_SPEC = 
"segmentGenerationJobSpec";
+  // Kept for backward compatibility; callers should prefer 
SegmentGenerationJobUtils.SEGMENT_GENERATION_JOB_SPEC
+  public static final String SEGMENT_GENERATION_JOB_SPEC = 
SegmentGenerationJobUtils.SEGMENT_GENERATION_JOB_SPEC;
 
-  // Field names in job spec's executionFrameworkSpec/extraConfigs section
-  private static final String DEPS_JAR_DIR_FIELD = "dependencyJarDir";
-  private static final String STAGING_DIR_FIELD = "stagingDir";
-
-  // Sub-dirs under directory specified by STAGING_DIR_FIELD
+  // Sub-dirs under the staging directory
   private static final String SEGMENT_TAR_SUBDIR_NAME = "segmentTar";
   private static final String DEPS_JAR_SUBDIR_NAME = "dependencyJars";
 
@@ -156,7 +153,8 @@ public class HadoopSegmentGenerationJobRunner extends 
Configured implements Inge
     outputDirFS.mkdir(outputDirURI);
 
     //Get staging directory for temporary output pinot segments
-    String stagingDir = 
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR_FIELD);
+    String stagingDir =
+        
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.STAGING_DIR);
     Preconditions.checkNotNull(stagingDir, "Please set config: stagingDir 
under 'executionFrameworkSpec.extraConfigs'");
     URI stagingDirURI = URI.create(stagingDir);
     if (stagingDirURI.getScheme() == null) {
@@ -247,7 +245,8 @@ public class HadoopSegmentGenerationJobRunner extends 
Configured implements Inge
       packPluginsToDistributedCache(job, outputDirFS, stagingDirURI);
 
       // Add dependency jars, if we're provided with a directory containing 
these.
-      String dependencyJarsSrcDir = 
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR_FIELD);
+      String dependencyJarsSrcDir =
+          
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR);
       if (dependencyJarsSrcDir != null) {
         Path dependencyJarsDestPath = new Path(stagingDirURI.toString(), 
DEPS_JAR_SUBDIR_NAME);
         addJarsToDistributedCache(job, new File(dependencyJarsSrcDir), 
outputDirFS, dependencyJarsDestPath.toUri(),
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
index d32fb861fea..fccf20175ca 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
@@ -67,8 +67,6 @@ import static 
org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY
 public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, 
Serializable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkSegmentGenerationJobRunner.class);
-  private static final String DEPS_JAR_DIR = "dependencyJarDir";
-  private static final String STAGING_DIR = "stagingDir";
 
   private SegmentGenerationJobSpec _spec;
 
@@ -155,7 +153,8 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
     outputDirFS.mkdir(outputDirURI);
 
     //Get staging directory for temporary output pinot segments
-    String stagingDir = 
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
+    String stagingDir =
+        
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.STAGING_DIR);
     URI stagingDirURI = null;
     if (stagingDir != null) {
       stagingDirURI = URI.create(stagingDir);
@@ -178,9 +177,10 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
       packPluginsToDistributedCache(sparkContext);
 
       // Add dependency jars
-      if 
(_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) 
{
+      if (_spec.getExecutionFrameworkSpec().getExtraConfigs()
+          .containsKey(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR)) {
         addDepsJarToDistributedCache(sparkContext,
-            
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
+            
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR));
       }
 
       List<String> pathAndIdxList = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to