This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 657e7f075f7 Support metadata push mode in 
BaseSingleSegmentConversionExecutor (#17632)
657e7f075f7 is described below

commit 657e7f075f7a5d41c45565f4bc5247ca3b4f7293
Author: Shounak kulkarni <[email protected]>
AuthorDate: Mon Feb 16 11:15:18 2026 +0530

    Support metadata push mode in BaseSingleSegmentConversionExecutor (#17632)
    
    * Support metadata push mode in BaseSingleSegmentConversionExecutor
    
    * remove URI support
    
    * bypass local fs specific logic for metadata push mode testing
    
    * inject relevant push configs for tasks extending ssce
    
    * extend existing tests to validate metadata push mode
    
    * cleanup
    
    Note: Please note that there unrelated test failures. We should take it 
separately to address flaky tests.
    
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.setUp:292->BaseRealtimeClusterIntegrationTest.setUp:67->startKafka:88->startSchemaRegistry:99
 ยป IllegalState Could not find a valid Docker environment. Please see logs and 
check configuration
---
 ...geMetadataPushMinionClusterIntegrationTest.java |  76 ++++++++++++
 .../tests/PurgeMinionClusterIntegrationTest.java   |   2 +-
 ...ntMetadataPushMinionClusterIntegrationTest.java |  71 ++++++++++++
 ...RefreshSegmentMinionClusterIntegrationTest.java |   2 +-
 .../BaseMultipleSegmentsConversionExecutor.java    | 115 ++----------------
 .../tasks/BaseSingleSegmentConversionExecutor.java |  73 +++++++++---
 .../plugin/minion/tasks/BaseTaskExecutor.java      | 129 +++++++++++++++++++++
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java |  23 +++-
 .../minion/tasks/purge/PurgeTaskGenerator.java     |   2 +
 .../RefreshSegmentTaskGenerator.java               |   1 +
 .../SegmentGenerationAndPushTaskExecutor.java      |  15 +--
 .../UpsertCompactionTaskGenerator.java             |   1 +
 .../plugin/minion/tasks/MinionTaskUtilsTest.java   |  15 +++
 13 files changed, 385 insertions(+), 140 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMetadataPushMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMetadataPushMinionClusterIntegrationTest.java
new file mode 100644
index 00000000000..c35cf6926d4
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMetadataPushMinionClusterIntegrationTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that runs the Purge minion task with METADATA {@link
+ * org.apache.pinot.spi.ingestion.batch.BatchConfigProperties.SegmentPushType} 
to verify the full flow.
+ * Only {@link #testFirstRunPurge()} is enabled; other tests from the base 
class are disabled.
+ */
+public class PurgeMetadataPushMinionClusterIntegrationTest extends 
PurgeMinionClusterIntegrationTest {
+
+  @Override
+  protected TableTaskConfig getPurgeTaskConfig() {
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    
tableTaskConfigs.put(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD,
 "1d");
+    tableTaskConfigs.put(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.METADATA.name());
+    tableTaskConfigs.put(MinionTaskUtils.ALLOW_METADATA_PUSH_WITH_LOCAL_FS, 
"true");
+    return new 
TableTaskConfig(Collections.singletonMap(MinionConstants.PurgeTask.TASK_TYPE, 
tableTaskConfigs));
+  }
+
+  @Override
+  @Test(enabled = false)
+  public void testPassedDelayTimePurge() {
+    // Disabled: only testFirstRunPurge runs for METADATA push flow.
+  }
+
+  @Override
+  @Test(enabled = false)
+  public void testNotPassedDelayTimePurge() {
+    // Disabled: only testFirstRunPurge runs for METADATA push flow.
+  }
+
+  @Override
+  @Test(enabled = false)
+  public void testPurgeOnOldSegmentsWithIndicesOnNewColumns() {
+    // Disabled: only testFirstRunPurge runs for METADATA push flow.
+  }
+
+  @Override
+  @Test(enabled = false)
+  public void testSegmentDeletionWhenAllRecordsPurged() {
+    // Disabled: only testFirstRunPurge runs for METADATA push flow.
+  }
+
+  @Override
+  @Test(enabled = false)
+  public void testRealtimeLastSegmentPreservation() {
+    // Disabled: only testFirstRunPurge runs for METADATA push flow.
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index 9aab0fc5771..10d69b04ba2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -190,7 +190,7 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     _tableName = tableName;
   }
 
-  private TableTaskConfig getPurgeTaskConfig() {
+  protected TableTaskConfig getPurgeTaskConfig() {
     Map<String, String> tableTaskConfigs = new HashMap<>();
     
tableTaskConfigs.put(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD,
 "1d");
     return new 
TableTaskConfig(Collections.singletonMap(MinionConstants.PurgeTask.TASK_TYPE, 
tableTaskConfigs));
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMetadataPushMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMetadataPushMinionClusterIntegrationTest.java
new file mode 100644
index 00000000000..a434a32194f
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMetadataPushMinionClusterIntegrationTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that runs the Refresh Segment minion task with METADATA 
{@link
+ * org.apache.pinot.spi.ingestion.batch.BatchConfigProperties.SegmentPushType} 
to verify the full flow.
+ * Only {@link #testFirstSegmentRefresh()} is enabled; other tests from the 
base class are disabled.
+ */
+public class RefreshSegmentMetadataPushMinionClusterIntegrationTest
+    extends RefreshSegmentMinionClusterIntegrationTest {
+
+  @Override
+  protected TableTaskConfig getRefreshSegmentTaskConfig() {
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    tableTaskConfigs.put(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.METADATA.name());
+    tableTaskConfigs.put(MinionTaskUtils.ALLOW_METADATA_PUSH_WITH_LOCAL_FS, 
"true");
+    return new TableTaskConfig(
+        Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE, 
tableTaskConfigs));
+  }
+
+  @Override
+  @Test(priority = 2, enabled = false)
+  public void testValidDatatypeChange() {
+    // Disabled: only testFirstSegmentRefresh runs for METADATA push flow.
+  }
+
+  @Override
+  @Test(priority = 3, enabled = false)
+  public void testIndexChanges() {
+    // Disabled: only testFirstSegmentRefresh runs for METADATA push flow.
+  }
+
+  @Override
+  @Test(priority = 4, enabled = false)
+  public void checkColumnAddition() {
+    // Disabled: only testFirstSegmentRefresh runs for METADATA push flow.
+  }
+
+  @Override
+  @Test(priority = 5, enabled = false)
+  public void checkRefreshNotNecessary() {
+    // Disabled: only testFirstSegmentRefresh runs for METADATA push flow.
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
index c0921838682..335a176c02f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
@@ -437,7 +437,7 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
     }, 60_000L, "Failed to meet condition");
   }
 
-  private TableTaskConfig getRefreshSegmentTaskConfig() {
+  protected TableTaskConfig getRefreshSegmentTaskConfig() {
     Map<String, String> tableTaskConfigs = new HashMap<>();
     return new TableTaskConfig(
         Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE, 
tableTaskConfigs));
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 22351040b04..2ee0d1656c0 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -41,10 +41,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.NameValuePair;
-import org.apache.hc.core5.http.message.BasicHeader;
 import org.apache.hc.core5.http.message.BasicNameValuePair;
-import org.apache.pinot.common.auth.AuthProviderUtils;
-import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.metrics.MinionMeter;
 import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
@@ -62,10 +59,8 @@ import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
-import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
-import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,10 +80,6 @@ public abstract class BaseMultipleSegmentsConversionExecutor 
extends BaseTaskExe
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
   private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = 
"lineageEntryId";
 
-  private static final int DEFAULT_PUSH_ATTEMPTS = 5;
-  private static final int DEFAULT_PUSH_PARALLELISM = 1;
-  private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
-
   protected MinionConf _minionConf;
 
   // Tracking finer grained progress status.
@@ -442,59 +433,12 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     }
   }
 
-  @VisibleForTesting
-  PushJobSpec getPushJobSpec(Map<String, String> taskConfigs) {
-    PushJobSpec pushJobSpec = new PushJobSpec();
-    pushJobSpec.setPushAttempts(DEFAULT_PUSH_ATTEMPTS);
-    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
-    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
-    
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
-    
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
-    boolean batchSegmentUpload = Boolean.parseBoolean(taskConfigs.getOrDefault(
-        BatchConfigProperties.BATCH_SEGMENT_UPLOAD, "false"));
-    if (batchSegmentUpload) {
-      pushJobSpec.setBatchSegmentUpload(true);
-    }
-    return pushJobSpec;
-  }
-
   @VisibleForTesting
   List<Header> getSegmentPushCommonHeaders(PinotTaskConfig pinotTaskConfig, 
AuthProvider authProvider,
       List<SegmentConversionResult> segmentConversionResults) {
-    SegmentConversionResult segmentConversionResult;
-    if (segmentConversionResults.size() == 1) {
-      segmentConversionResult = segmentConversionResults.get(0);
-    } else {
-      // Setting to null as the base method expects a single object. This is 
ok for now, since the
-      // segmentConversionResult is not made use of while generating the 
customMap.
-      segmentConversionResult = null;
-    }
-    SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
-        getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, 
segmentConversionResult);
-    Header segmentZKMetadataCustomMapModifierHeader =
-        new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
-            segmentZKMetadataCustomMapModifier.toJsonString());
-
-    List<Header> headers = new ArrayList<>();
-    headers.add(segmentZKMetadataCustomMapModifierHeader);
-    headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
-    return headers;
-  }
-
-  @VisibleForTesting
-  List<NameValuePair> getSegmentPushCommonParams(String tableNameWithType) {
-    List<NameValuePair> params = new ArrayList<>();
-    params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
-        "true"));
-    params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
-        TableNameBuilder.extractRawTableName(tableNameWithType)));
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    if (tableType != null) {
-      params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, 
tableType.toString()));
-    } else {
-      throw new RuntimeException("Failed to determine the tableType from name: 
" + tableNameWithType);
-    }
-    return params;
+    SegmentConversionResult segmentConversionResult =
+        segmentConversionResults.size() == 1 ? segmentConversionResults.get(0) 
: null;
+    return getSegmentPushMetadataHeaders(pinotTaskConfig, authProvider, 
segmentConversionResult);
   }
 
   private void pushSegments(String tableNameWithType, Map<String, String> 
taskConfigs, PinotTaskConfig pinotTaskConfig,
@@ -516,20 +460,13 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
   private void pushSegment(String tableName, Map<String, String> taskConfigs, 
URI outputSegmentTarURI,
       List<Header> headers, List<NameValuePair> parameters, 
SegmentConversionResult segmentConversionResult)
       throws Exception {
-    String pushMode =
-        taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.TAR.name());
-    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
-
-    PushJobSpec pushJobSpec = new PushJobSpec();
-    pushJobSpec.setPushAttempts(DEFAULT_PUSH_ATTEMPTS);
-    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
-    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
-    
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
-    
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+    BatchConfigProperties.SegmentPushType pushType = 
getSegmentPushType(taskConfigs);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushType, outputSegmentTarURI);
 
+    PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs);
     SegmentGenerationJobSpec spec = 
generateSegmentGenerationJobSpec(tableName, taskConfigs, pushJobSpec);
 
-    switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+    switch (pushType) {
       case TAR:
         File tarFile = new File(outputSegmentTarURI);
         String segmentName = segmentConversionResult.getSegmentName();
@@ -552,43 +489,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         }
         break;
       default:
-        throw new UnsupportedOperationException("Unrecognized push mode - " + 
pushMode);
-    }
-  }
-
-  private SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String 
tableName, Map<String, String> taskConfigs,
-      PushJobSpec pushJobSpec) {
-
-    TableSpec tableSpec = new TableSpec();
-    tableSpec.setTableName(tableName);
-
-    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
-    
pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
-    PinotClusterSpec[] pinotClusterSpecs = new 
PinotClusterSpec[]{pinotClusterSpec};
-
-    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
-    spec.setPushJobSpec(pushJobSpec);
-    spec.setTableSpec(tableSpec);
-    spec.setPinotClusterSpecs(pinotClusterSpecs);
-    spec.setAuthToken(MinionTaskUtils.resolveAuthToken(taskConfigs));
-
-    return spec;
-  }
-
-  private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File 
localSegmentTarFile)
-      throws Exception {
-    URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
-    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, 
outputSegmentDirURI)) {
-      URI outputSegmentTarURI =
-          
URI.create(MinionTaskUtils.normalizeDirectoryURI(outputSegmentDirURI) + 
localSegmentTarFile.getName());
-      if 
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) 
&& outputFileFS.exists(
-          outputSegmentTarURI)) {
-        throw new RuntimeException("Output file: " + outputSegmentTarURI + " 
already exists. Set 'overwriteOutput' to "
-            + "true to ignore this error");
-      } else {
-        outputFileFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
-      }
-      return outputSegmentTarURI;
+        throw new UnsupportedOperationException("Unrecognized push mode - " + 
pushType);
     }
   }
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index 965a00b3bb3..d3a0494dce7 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -21,8 +21,8 @@ package org.apache.pinot.plugin.minion.tasks;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -31,7 +31,6 @@ import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpHeaders;
 import org.apache.hc.core5.http.NameValuePair;
 import org.apache.hc.core5.http.message.BasicHeader;
-import org.apache.hc.core5.http.message.BasicNameValuePair;
 import org.apache.pinot.common.auth.AuthProviderUtils;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.metrics.MinionMeter;
@@ -43,8 +42,13 @@ import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskExecutor;
+import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -184,22 +188,27 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
       httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
       httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
 
-      // Set parameters for upload request.
-      NameValuePair enableParallelPushProtectionParameter =
-          new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
 "true");
-      NameValuePair tableNameParameter = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
-          TableNameBuilder.extractRawTableName(tableNameWithType));
-      NameValuePair tableTypeParameter = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
-          
TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString());
-      List<NameValuePair> parameters =
-          Arrays.asList(enableParallelPushProtectionParameter, 
tableNameParameter, tableTypeParameter);
-
-      // Upload the tarred segment
-      _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + 
segmentName);
+      // Set parameters for upload request (shared with metadata push).
+      List<NameValuePair> parameters = 
getSegmentPushCommonParams(tableNameWithType);
+
+      // Upload the tarred segment using the configured push mode (TAR or 
METADATA)
+      BatchConfigProperties.SegmentPushType pushType = 
getSegmentPushType(configs);
+      _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + 
segmentName + " (push mode: " + pushType
+          + ")");
       boolean uploadSuccessful = true;
       try {
-        SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters, 
tableNameWithType, segmentName,
-            uploadURL, convertedTarredSegmentFile);
+        switch (pushType) {
+          case TAR:
+            SegmentConversionUtils.uploadSegment(configs, httpHeaders, 
parameters, tableNameWithType, segmentName,
+                uploadURL, convertedTarredSegmentFile);
+            break;
+          case METADATA:
+            uploadSegmentWithMetadata(configs, pinotTaskConfig, 
segmentConversionResult, authProvider, parameters,
+                tableNameWithType, convertedTarredSegmentFile);
+            break;
+          default:
+            throw new UnsupportedOperationException("Unrecognized push mode: " 
+ pushType);
+        }
       } catch (Exception e) {
         uploadSuccessful = false;
         _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_UPLOAD_FAIL_COUNT, 1L);
@@ -220,6 +229,38 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
     }
   }
 
+  /**
+   * Pushes the segment in METADATA (or URI) mode: copies the tarred segment 
to the output PinotFS and sends segment
+   * URI and metadata to the controller. Requires {@link 
BatchConfigProperties#OUTPUT_SEGMENT_DIR_URI} and
+   * {@link BatchConfigProperties#PUSH_CONTROLLER_URI} in configs.
+   */
+  private void uploadSegmentWithMetadata(Map<String, String> configs, 
PinotTaskConfig pinotTaskConfig,
+      SegmentConversionResult segmentConversionResult, AuthProvider 
authProvider, List<NameValuePair> parameters,
+      String tableNameWithType, File convertedTarredSegmentFile)
+      throws Exception {
+    if (!configs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      throw new RuntimeException("Output dir URI missing for metadata push. 
Set "
+          + BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI + " in task config.");
+    }
+    URI outputSegmentDirURI = 
URI.create(configs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    URI outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, 
convertedTarredSegmentFile);
+    LOGGER.info("Moved generated segment from [{}] to location: [{}]", 
convertedTarredSegmentFile, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = getPushJobSpec(configs);
+    SegmentGenerationJobSpec spec = generateSegmentGenerationJobSpec(
+        TableNameBuilder.extractRawTableName(tableNameWithType), configs, 
pushJobSpec);
+
+    List<Header> metadataHeaders = 
getSegmentPushMetadataHeaders(pinotTaskConfig, authProvider,
+        segmentConversionResult);
+
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(configs, 
outputSegmentDirURI)) {
+      Map<String, String> segmentUriToTarPathMap = 
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI,
+          pushJobSpec, new String[]{outputSegmentTarURI.toString()});
+      SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, 
segmentUriToTarPathMap, metadataHeaders,
+          parameters);
+    }
+  }
+
   // For tests only.
   @VisibleForTesting
   public void setMinionEventObserver(MinionEventObserver observer) {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
index 061c43ea4cc..8dbc8ed3b8a 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
@@ -21,16 +21,24 @@ package org.apache.pinot.plugin.minion.tasks;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.metrics.MinionMeter;
 import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -38,7 +46,15 @@ import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +62,9 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseTaskExecutor implements PinotTaskExecutor {
   protected static final Logger LOGGER = 
LoggerFactory.getLogger(BaseTaskExecutor.class);
   protected static final MinionContext MINION_CONTEXT = 
MinionContext.getInstance();
+  protected static final int SEGMENT_PUSH_DEFAULT_ATTEMPTS = 5;
+  protected static final int SEGMENT_PUSH_DEFAULT_PARALLELISM = 1;
+  protected static final long SEGMENT_PUSH_DEFAULT_RETRY_INTERVAL_MILLIS = 
1000L;
 
   protected boolean _cancelled = false;
   protected final MinionMetrics _minionMetrics = MinionMetrics.get();
@@ -161,4 +180,114 @@ public abstract class BaseTaskExecutor implements 
PinotTaskExecutor {
     }
     return indexDir;
   }
+
+  /**
+   * Builds a {@link PushJobSpec} from task configs. Used for both TAR and 
METADATA push modes.
+   */
+  protected PushJobSpec getPushJobSpec(Map<String, String> configs) {
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(SEGMENT_PUSH_DEFAULT_ATTEMPTS);
+    pushJobSpec.setPushParallelism(SEGMENT_PUSH_DEFAULT_PARALLELISM);
+    
pushJobSpec.setPushRetryIntervalMillis(SEGMENT_PUSH_DEFAULT_RETRY_INTERVAL_MILLIS);
+    
pushJobSpec.setSegmentUriPrefix(configs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    
pushJobSpec.setSegmentUriSuffix(configs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+    boolean batchSegmentUpload = Boolean.parseBoolean(configs.getOrDefault(
+        BatchConfigProperties.BATCH_SEGMENT_UPLOAD, "false"));
+    if (batchSegmentUpload) {
+      pushJobSpec.setBatchSegmentUpload(true);
+    }
+    return pushJobSpec;
+  }
+
+  /**
+   * Builds a {@link SegmentGenerationJobSpec} for segment push (TAR or 
METADATA). Requires
+   * {@link BatchConfigProperties#PUSH_CONTROLLER_URI} in configs for METADATA 
push.
+   */
+  protected SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String 
tableName, Map<String, String> configs,
+      PushJobSpec pushJobSpec) {
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    
pinotClusterSpec.setControllerURI(configs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
+    spec.setAuthToken(MinionTaskUtils.resolveAuthToken(configs));
+    return spec;
+  }
+
+  /**
+   * Copies the local segment tar file to the output PinotFS. Requires
+   * {@link BatchConfigProperties#OUTPUT_SEGMENT_DIR_URI} in configs.
+   *
+   * @return the URI of the segment tar on the output filesystem
+   */
+  protected URI moveSegmentToOutputPinotFS(Map<String, String> configs, File 
localSegmentTarFile)
+      throws Exception {
+    URI outputSegmentDirURI = 
URI.create(configs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(configs, 
outputSegmentDirURI)) {
+      URI outputSegmentTarURI = 
URI.create(MinionTaskUtils.normalizeDirectoryURI(outputSegmentDirURI)
+          + URIUtils.encode(localSegmentTarFile.getName()));
+      if 
(!Boolean.parseBoolean(configs.get(BatchConfigProperties.OVERWRITE_OUTPUT))
+          && outputFileFS.exists(outputSegmentTarURI)) {
+        throw new RuntimeException("Output file: " + outputSegmentTarURI + " 
already exists. Set 'overwriteOutput' to "
+            + "true to ignore this error");
+      }
+      outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+      return outputSegmentTarURI;
+    }
+  }
+
+  /**
+   * Returns HTTP parameters common to segment upload and metadata push 
(parallel push protection, table name, type).
+   */
+  protected List<NameValuePair> getSegmentPushCommonParams(String 
tableNameWithType) {
+    List<NameValuePair> params = new ArrayList<>();
+    params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
+        "true"));
+    params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
+        TableNameBuilder.extractRawTableName(tableNameWithType)));
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    if (tableType != null) {
+      params.add(new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, 
tableType.toString()));
+    } else {
+      throw new RuntimeException("Failed to determine the tableType from name: 
" + tableNameWithType);
+    }
+    return params;
+  }
+
+  /**
+   * Returns HTTP headers for segment metadata push (ZK metadata custom map 
modifier + auth). Used when pushing
+   * metadata to the controller instead of uploading the tar via HTTP.
+   *
+   * @param segmentConversionResult the conversion result for the segment; may 
be null when building headers for
+   *                                 multiple segments where a single modifier 
does not apply
+   */
+  protected List<Header> getSegmentPushMetadataHeaders(PinotTaskConfig 
pinotTaskConfig, AuthProvider authProvider,
+      SegmentConversionResult segmentConversionResult) {
+    SegmentZKMetadataCustomMapModifier modifier =
+        getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, 
segmentConversionResult);
+    Header modifierHeader =
+        new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+            modifier.toJsonString());
+    List<Header> headers = new ArrayList<>();
+    headers.add(modifierHeader);
+    headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
+    return headers;
+  }
+
+  /**
+   * Returns the segment push mode for upload. Default is TAR (HTTP upload). 
Subclasses may override to use METADATA
+   * mode (move segment to output PinotFS and send metadata to controller) 
when needed.
+   *
+   * @param configs task configs; may contain {@link 
BatchConfigProperties#PUSH_MODE}
+   * @return push type (TAR or METADATA)
+   */
+  protected BatchConfigProperties.SegmentPushType 
getSegmentPushType(Map<String, String> configs) {
+    String pushMode = configs.getOrDefault(BatchConfigProperties.PUSH_MODE,
+        BatchConfigProperties.SegmentPushType.TAR.name());
+    return 
BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase());
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 2da96335b75..ca197849210 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -72,6 +72,12 @@ public class MinionTaskUtils {
   public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
   public static final String UTC = "UTC";
 
+  /**
+   * When true, allows METADATA push mode with local FS output dir. Intended 
for integration tests only.
+   * Production should leave this unset (defaults to false); local FS then 
always uses TAR push.
+   */
+  public static final String ALLOW_METADATA_PUSH_WITH_LOCAL_FS = 
"allowMetadataPushWithLocalFs";
+
   private MinionTaskUtils() {
   }
 
@@ -191,9 +197,20 @@ public class MinionTaskUtils {
             break;
         }
       } else {
-        LOGGER.warn("Local output dir found, defaulting to TAR: {}.", 
outputSegmentDirURI);
-        singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
-            BatchConfigProperties.SegmentPushType.TAR.toString());
+        boolean allowMetadataPushWithLocalFs = Boolean.parseBoolean(
+            taskConfigs.getOrDefault(ALLOW_METADATA_PUSH_WITH_LOCAL_FS, 
"false"));
+        if (allowMetadataPushWithLocalFs && pushMode != null) {
+          // Override for integration tests: respect explicit push mode with 
local FS
+          
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
+              outputSegmentDirURI.toString());
+          singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+              segmentPushType.toString());
+        } else {
+          // Production: default to TAR for local output dir
+          LOGGER.warn("Local output dir found, defaulting to TAR: {}.", 
outputSegmentDirURI);
+          singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+              BatchConfigProperties.SegmentPushType.TAR.toString());
+        }
       }
 
       
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
index 298927b0c97..16506881cef 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -122,6 +123,7 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
           continue;
         }
         Map<String, String> configs = new 
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
+        configs.putAll(MinionTaskUtils.getPushTaskConfig(tableName, 
taskConfigs, _clusterInfoAccessor));
         Long tsLastPurge;
         if (segmentZKMetadata.getCustomMap() != null) {
           tsLastPurge = Long.valueOf(segmentZKMetadata.getCustomMap()
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
index 81263798a42..5dc1ca2f062 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
@@ -127,6 +127,7 @@ public class RefreshSegmentTaskGenerator extends 
BaseTaskGenerator {
       }
 
       Map<String, String> configs = new 
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
+      configs.putAll(MinionTaskUtils.getPushTaskConfig(tableNameWithType, 
taskConfigs, _clusterInfoAccessor));
       configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segmentZKMetadata.getDownloadUrl());
       configs.put(MinionConstants.UPLOAD_URL_KEY,
           _clusterInfoAccessor.getVipUrlForLeadController(tableNameWithType) + 
"/segments");
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
index dc42ca8eda9..9c71c50f6ec 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
@@ -236,22 +236,13 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     return spec;
   }
 
-  private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File 
localSegmentTarFile)
+  @Override
+  protected URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, 
File localSegmentTarFile)
       throws Exception {
     if 
(!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
       return localSegmentTarFile.toURI();
     }
-    URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
-    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, 
outputSegmentDirURI)) {
-      URI outputSegmentTarURI = URI.create(outputSegmentDirURI + 
localSegmentTarFile.getName());
-      if 
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) 
&& outputFileFS.exists(
-          outputSegmentTarURI)) {
-        LOGGER.warn("Not overwrite existing output segment tar file: {}", 
outputFileFS.exists(outputSegmentTarURI));
-      } else {
-        outputFileFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
-      }
-      return outputSegmentTarURI;
-    }
+    return super.moveSegmentToOutputPinotFS(taskConfigs, localSegmentTarFile);
   }
 
   private File tarSegmentDir(SegmentGenerationTaskSpec taskSpec, String 
segmentName)
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index c704d5a6b8f..02e15b9d328 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -187,6 +187,7 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
           continue;
         }
         Map<String, String> configs = new 
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segment.getSegmentName())));
+        configs.putAll(MinionTaskUtils.getPushTaskConfig(tableNameWithType, 
taskConfigs, _clusterInfoAccessor));
         configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segment.getDownloadUrl());
         configs.put(MinionConstants.UPLOAD_URL_KEY,
             _clusterInfoAccessor.getVipUrlForLeadController(tableNameWithType) 
+ "/segments");
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index 1e804d7e6c2..0ad879896f3 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -350,4 +350,19 @@ public class MinionTaskUtilsTest {
         BatchConfigProperties.SegmentPushType.TAR.toString());
     
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI), 
"http://localhost:9000";);
   }
+
+  @Test
+  public void testGetPushTaskConfigMETADATAPushModeWithLocalOutputDir() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.METADATA.toString());
+    taskConfig.put(MinionTaskUtils.ALLOW_METADATA_PUSH_WITH_LOCAL_FS, "true");
+    Map<String, String> pushTaskConfigs = 
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+        getMockClusterInfo("/data/dir", "http://localhost:9000";));
+
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI), 
"/data/dir/myTable");
+    assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+        BatchConfigProperties.SegmentPushType.METADATA.toString());
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI), 
"http://localhost:9000";);
+    assertEquals(pushTaskConfigs.size(), 4);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to