This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fc81e8e4e5f Support MinionContext AuthProvider in Minion executors 
(#17405)
fc81e8e4e5f is described below

commit fc81e8e4e5fd5545a6a7538101c5fed529059c1d
Author: Shreyaa Sharma <[email protected]>
AuthorDate: Wed Jan 28 10:37:07 2026 +0530

    Support MinionContext AuthProvider in Minion executors (#17405)
---
 .../tests/SimpleMinionClusterIntegrationTest.java  |   2 +-
 .../minion/taskfactory/TaskFactoryRegistry.java    |   9 --
 .../BaseMultipleSegmentsConversionExecutor.java    |   6 +-
 .../tasks/BaseSingleSegmentConversionExecutor.java |   2 +-
 .../plugin/minion/tasks/BaseTaskExecutor.java      |  31 +++++
 ...BaseMultipleSegmentsConversionExecutorTest.java | 126 +++++++++++++++++++++
 6 files changed, 162 insertions(+), 14 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 222a9e9bcf4..55846837728 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -60,7 +60,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
   public static final String TABLE_NAME_2 = "testTable2";
   public static final String TABLE_NAME_3 = "testTable3";
   public static final int NUM_TASKS = 2;
-  public static final int NUM_CONFIGS = 5;
+  public static final int NUM_CONFIGS = 4;
   public static final AtomicBoolean HOLD = new AtomicBoolean();
   public static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
   public static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new 
AtomicBoolean();
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
index 209c33347d4..0720630271a 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
@@ -21,7 +21,6 @@ package org.apache.pinot.minion.taskfactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.JobContext;
@@ -30,14 +29,11 @@ import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
-import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.common.metrics.MinionGauge;
 import org.apache.pinot.common.metrics.MinionMeter;
 import org.apache.pinot.common.metrics.MinionMetrics;
 import org.apache.pinot.common.metrics.MinionTimer;
-import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
 import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObserverFactory;
@@ -138,11 +134,6 @@ public class TaskFactoryRegistry {
             }
 
             private TaskResult runInternal(PinotTaskConfig pinotTaskConfig) {
-              if 
(StringUtils.isBlank(pinotTaskConfig.getConfigs().get(MinionConstants.AUTH_TOKEN)))
 {
-                pinotTaskConfig.getConfigs().put(MinionConstants.AUTH_TOKEN,
-                    
AuthProviderUtils.toStaticToken(MinionContext.getInstance().getTaskAuthProvider()));
-              }
-
               String tableName = pinotTaskConfig.getTableName();
 
               _eventObserver.notifyTaskStart(pinotTaskConfig);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index b794d95ac37..22a8790cb6e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -124,7 +124,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
-    AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+    AuthProvider authProvider = resolveAuthProvider(configs);
     Set<String> segmentNamesForTable = 
SegmentConversionUtils.getSegmentNamesForTable(tableNameWithType,
         FileUploadDownloadClient.extractBaseURI(new URI(uploadURL)), 
authProvider);
     Set<String> nonExistingSegmentNames =
@@ -194,7 +194,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
     String downloadURLString = 
taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String[] downloadURLs = 
downloadURLString.split(MinionConstants.URL_SEPARATOR);
-    AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
+    AuthProvider authProvider = resolveAuthProvider(taskConfigs);
     File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), 
taskType), "tmp-" + UUID.randomUUID());
     Preconditions.checkState(tempDataDir.mkdirs());
     int numRecords;
@@ -613,7 +613,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       Map<String, String> configs = pinotTaskConfig.getConfigs();
       _tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
       _uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
-      _authProvider = 
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+      _authProvider = resolveAuthProvider(configs);
       _inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
       String replaceSegmentsString = 
configs.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY);
       _replaceSegmentsEnabled = Boolean.parseBoolean(replaceSegmentsString);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index e35a9c5cd5f..965a00b3bb3 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -81,7 +81,7 @@ public abstract class BaseSingleSegmentConversionExecutor 
extends BaseTaskExecut
     String downloadURL = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
     String originalSegmentCrc = 
configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
-    AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+    AuthProvider authProvider = resolveAuthProvider(configs);
 
     long currentSegmentCrc = getSegmentCrc(tableNameWithType, segmentName);
     if (Long.parseLong(originalSegmentCrc) != currentSegmentCrc) {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
index c61a5b58d94..245bfc2480a 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
@@ -23,7 +23,11 @@ import java.io.File;
 import java.net.URI;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.auth.NullAuthProvider;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
@@ -31,10 +35,12 @@ import org.apache.pinot.common.metrics.MinionMeter;
 import org.apache.pinot.common.metrics.MinionMetrics;
 import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
+import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
@@ -111,6 +117,31 @@ public abstract class BaseTaskExecutor implements 
PinotTaskExecutor {
     _minionMetrics.addMeteredTableValue(tableName, taskType, meter, unitCount);
   }
 
+  /**
+   * Resolves the AuthProvider to use for Minion tasks.
+   * Priority order:
+   * 1. If AUTH_TOKEN is explicitly provided in task configs (by Controller), 
use it for this specific task
+   * 2. Otherwise, fall back to the runtime AuthProvider from MinionContext 
(enables per-request token rotation)
+   *
+   * This approach allows:
+   * - Controller to override credentials per-task (e.g., for multi-tenancy or 
privileged operations)
+   * - Dynamic token rotation when no explicit override is provided
+   * - Clean separation between task-specific and global authentication
+   */
+  protected static AuthProvider resolveAuthProvider(Map<String, String> 
taskConfigs) {
+    String explicitToken = taskConfigs.get(MinionConstants.AUTH_TOKEN);
+    if (StringUtils.isNotBlank(explicitToken)) {
+      return AuthProviderUtils.makeAuthProvider(explicitToken);
+    }
+
+    AuthProvider runtimeProvider = MINION_CONTEXT.getTaskAuthProvider();
+    if (runtimeProvider == null || runtimeProvider instanceof 
NullAuthProvider) {
+      return new NullAuthProvider();
+    }
+
+    return runtimeProvider;
+  }
+
   protected File downloadSegmentToLocalAndUntar(String tableNameWithType, 
String segmentName, String deepstoreURL,
       String taskType, File tempDataDir, String suffix)
       throws Exception {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutorTest.java
index fc3e6e5085b..0abbbf739ea 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutorTest.java
@@ -30,9 +30,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.NameValuePair;
 import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.NullAuthProvider;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
@@ -52,6 +55,7 @@ public class BaseMultipleSegmentsConversionExecutorTest {
 
   @AfterMethod
   public void tearDown() throws IOException {
+    MinionContext.getInstance().setTaskAuthProvider(null);
     // Clean up the temporary directory
     FileUtils.deleteDirectory(_tempDir);
   }
@@ -80,6 +84,35 @@ public class BaseMultipleSegmentsConversionExecutorTest {
     };
   }
 
+  // Helper method to create task configs with optional AUTH_TOKEN
+  private Map<String, String> createTaskConfigs(String authToken) {
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put("tableName", "myTable_OFFLINE");
+    taskConfigs.put("uploadURL", "http://controller:9000/upload";);
+    if (authToken != null) {
+      taskConfigs.put(MinionConstants.AUTH_TOKEN, authToken);
+    }
+    return taskConfigs;
+  }
+
+  // Helper method to create SegmentUploadContext and get auth provider headers
+  private List<Header> getAuthHeaders(Map<String, String> taskConfigs) {
+    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig("customMinionTask", 
taskConfigs);
+    List<SegmentConversionResult> results = new ArrayList<>();
+    BaseMultipleSegmentsConversionExecutor.SegmentUploadContext ctx =
+        new 
BaseMultipleSegmentsConversionExecutor.SegmentUploadContext(pinotTaskConfig, 
results);
+    return _executor.getSegmentPushCommonHeaders(pinotTaskConfig, 
ctx.getAuthProvider(), results);
+  }
+
+  // Helper method to get AuthProvider from task configs
+  private AuthProvider getAuthProviderFromTaskConfigs(Map<String, String> 
taskConfigs) {
+    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig("customMinionTask", 
taskConfigs);
+    List<SegmentConversionResult> results = new ArrayList<>();
+    BaseMultipleSegmentsConversionExecutor.SegmentUploadContext ctx =
+        new 
BaseMultipleSegmentsConversionExecutor.SegmentUploadContext(pinotTaskConfig, 
results);
+    return ctx.getAuthProvider();
+  }
+
   @Test
   public void testGetPushJobSpec() {
     Map<String, String> taskConfigs = new HashMap<>();
@@ -98,6 +131,99 @@ public class BaseMultipleSegmentsConversionExecutorTest {
     Assert.assertEquals(headers.size(), 1);
   }
 
+  @Test
+  public void testRuntimeAuthProviderUsedWhenNoExplicitToken() {
+    // Set up a runtime provider in MinionContext
+    AuthProvider runtimeProvider = new AuthProvider() {
+      @Override
+      public Map<String, Object> getRequestHeaders() {
+        Map<String, Object> m = new HashMap<>();
+        m.put("X-Runtime-Auth", "runtime-value");
+        return m;
+      }
+      @Override
+      public String getTaskToken() {
+        return "IGNORED";
+      }
+    };
+    MinionContext.getInstance().setTaskAuthProvider(runtimeProvider);
+
+    // Create task WITHOUT explicit AUTH_TOKEN
+    Map<String, String> taskConfigs = createTaskConfigs(null);
+    List<Header> headers = getAuthHeaders(taskConfigs);
+
+    // Should use runtime provider since no explicit token was provided
+    boolean foundCustom = headers.stream().anyMatch(h -> 
h.getName().equals("X-Runtime-Auth")
+        && h.getValue().equals("runtime-value"));
+    Assert.assertTrue(foundCustom, "Expected custom header from runtime 
provider when no explicit token provided");
+  }
+
+  @Test
+  public void testExplicitTaskTokenTakesPrecedence() {
+    // Set up a runtime provider in MinionContext
+    AuthProvider runtimeProvider = new AuthProvider() {
+      @Override
+      public Map<String, Object> getRequestHeaders() {
+        Map<String, Object> m = new HashMap<>();
+        m.put("X-Runtime-Auth", "should-not-be-used");
+        return m;
+      }
+      @Override
+      public String getTaskToken() {
+        return "IGNORED";
+      }
+    };
+    MinionContext.getInstance().setTaskAuthProvider(runtimeProvider);
+
+    // Create task WITH explicit AUTH_TOKEN (should take precedence)
+    Map<String, String> taskConfigs = createTaskConfigs("Bearer 
explicit-task-token");
+    List<Header> headers = getAuthHeaders(taskConfigs);
+
+    // Should use explicit AUTH_TOKEN, not runtime provider
+    boolean foundExplicitToken = headers.stream().anyMatch(h -> 
h.getName().equals("Authorization")
+        && h.getValue().equals("Bearer explicit-task-token"));
+    Assert.assertTrue(foundExplicitToken, "Expected explicit AUTH_TOKEN to 
take precedence over runtime provider");
+
+    // Should NOT have runtime provider's custom header
+    boolean foundRuntimeHeader = headers.stream().anyMatch(h -> 
h.getName().equals("X-Runtime-Auth"));
+    Assert.assertFalse(foundRuntimeHeader, "Runtime provider should not be 
used when explicit token is provided");
+  }
+
+  @Test
+  public void testExplicitTokenUsedWhenRuntimeProviderIsNull() {
+    // No runtime provider configured (or NullAuthProvider)
+    MinionContext.getInstance().setTaskAuthProvider(new NullAuthProvider());
+
+    // Create task WITH explicit AUTH_TOKEN
+    Map<String, String> taskConfigs = createTaskConfigs("Bearer 
fallback-token");
+    List<Header> headers = getAuthHeaders(taskConfigs);
+
+    // Should use explicit AUTH_TOKEN
+    boolean foundAuth = headers.stream().anyMatch(h -> 
h.getName().equals("Authorization")
+        && h.getValue().equals("Bearer fallback-token"));
+    Assert.assertTrue(foundAuth, "Expected explicit AUTH_TOKEN to be used"
+  + " when runtime provider is null/NullAuthProvider");
+  }
+
+  @Test
+  public void testReturnsNullAuthProviderWhenBothTokenAndProviderAbsent() {
+    // No runtime provider configured
+    MinionContext.getInstance().setTaskAuthProvider(null);
+
+    // Create task WITHOUT explicit AUTH_TOKEN
+    Map<String, String> taskConfigs = createTaskConfigs(null);
+    List<Header> headers = getAuthHeaders(taskConfigs);
+
+    // Should return NullAuthProvider (no auth headers)
+    Assert.assertTrue(headers.isEmpty() || headers.stream().noneMatch(h -> 
h.getName().equals("Authorization")),
+        "Expected no Authorization header when both explicit token and runtime 
provider are absent");
+
+    // Verify the auth provider is indeed a NullAuthProvider
+    AuthProvider resolvedProvider = 
getAuthProviderFromTaskConfigs(taskConfigs);
+    Assert.assertTrue(resolvedProvider instanceof NullAuthProvider,
+        "Expected NullAuthProvider when both explicit token and runtime 
provider are absent");
+  }
+
   @Test
   public void testGetSegmentPushCommonParams() {
     String tableNameWithType = "myTable_OFFLINE";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to