This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fc81e8e4e5f Support MinionContext AuthProvider in Minion executors
(#17405)
fc81e8e4e5f is described below
commit fc81e8e4e5fd5545a6a7538101c5fed529059c1d
Author: Shreyaa Sharma <[email protected]>
AuthorDate: Wed Jan 28 10:37:07 2026 +0530
Support MinionContext AuthProvider in Minion executors (#17405)
---
.../tests/SimpleMinionClusterIntegrationTest.java | 2 +-
.../minion/taskfactory/TaskFactoryRegistry.java | 9 --
.../BaseMultipleSegmentsConversionExecutor.java | 6 +-
.../tasks/BaseSingleSegmentConversionExecutor.java | 2 +-
.../plugin/minion/tasks/BaseTaskExecutor.java | 31 +++++
...BaseMultipleSegmentsConversionExecutorTest.java | 126 +++++++++++++++++++++
6 files changed, 162 insertions(+), 14 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 222a9e9bcf4..55846837728 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -60,7 +60,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
public static final String TABLE_NAME_2 = "testTable2";
public static final String TABLE_NAME_3 = "testTable3";
public static final int NUM_TASKS = 2;
- public static final int NUM_CONFIGS = 5;
+ public static final int NUM_CONFIGS = 4;
public static final AtomicBoolean HOLD = new AtomicBoolean();
public static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
public static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new
AtomicBoolean();
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
index 209c33347d4..0720630271a 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
@@ -21,7 +21,6 @@ package org.apache.pinot.minion.taskfactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.task.JobContext;
@@ -30,14 +29,11 @@ import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskResult;
-import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.metrics.MinionGauge;
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.metrics.MinionMetrics;
import org.apache.pinot.common.metrics.MinionTimer;
-import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObserverFactory;
@@ -138,11 +134,6 @@ public class TaskFactoryRegistry {
}
private TaskResult runInternal(PinotTaskConfig pinotTaskConfig) {
- if
(StringUtils.isBlank(pinotTaskConfig.getConfigs().get(MinionConstants.AUTH_TOKEN)))
{
- pinotTaskConfig.getConfigs().put(MinionConstants.AUTH_TOKEN,
-
AuthProviderUtils.toStaticToken(MinionContext.getInstance().getTaskAuthProvider()));
- }
-
String tableName = pinotTaskConfig.getTableName();
_eventObserver.notifyTaskStart(pinotTaskConfig);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index b794d95ac37..22a8790cb6e 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -124,7 +124,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
- AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+ AuthProvider authProvider = resolveAuthProvider(configs);
Set<String> segmentNamesForTable =
SegmentConversionUtils.getSegmentNamesForTable(tableNameWithType,
FileUploadDownloadClient.extractBaseURI(new URI(uploadURL)),
authProvider);
Set<String> nonExistingSegmentNames =
@@ -194,7 +194,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
String downloadURLString =
taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
String[] downloadURLs =
downloadURLString.split(MinionConstants.URL_SEPARATOR);
- AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
+ AuthProvider authProvider = resolveAuthProvider(taskConfigs);
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(),
taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs());
int numRecords;
@@ -613,7 +613,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
Map<String, String> configs = pinotTaskConfig.getConfigs();
_tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
_uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
- _authProvider =
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+ _authProvider = resolveAuthProvider(configs);
_inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
String replaceSegmentsString =
configs.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY);
_replaceSegmentsEnabled = Boolean.parseBoolean(replaceSegmentsString);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index e35a9c5cd5f..965a00b3bb3 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -81,7 +81,7 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
String downloadURL = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
String originalSegmentCrc =
configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
- AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+ AuthProvider authProvider = resolveAuthProvider(configs);
long currentSegmentCrc = getSegmentCrc(tableNameWithType, segmentName);
if (Long.parseLong(originalSegmentCrc) != currentSegmentCrc) {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
index c61a5b58d94..245bfc2480a 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
@@ -23,7 +23,11 @@ import java.io.File;
import java.net.URI;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.auth.NullAuthProvider;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
@@ -31,10 +35,12 @@ import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.metrics.MinionMetrics;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
+import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
@@ -111,6 +117,31 @@ public abstract class BaseTaskExecutor implements
PinotTaskExecutor {
_minionMetrics.addMeteredTableValue(tableName, taskType, meter, unitCount);
}
+ /**
+ * Resolves the AuthProvider to use for Minion tasks.
+ * Priority order:
+ * 1. If AUTH_TOKEN is explicitly provided in task configs (by Controller),
use it for this specific task
+ * 2. Otherwise, fall back to the runtime AuthProvider from MinionContext
(enables per-request token rotation)
+ *
+ * This approach allows:
+ * - Controller to override credentials per-task (e.g., for multi-tenancy or
privileged operations)
+ * - Dynamic token rotation when no explicit override is provided
+ * - Clean separation between task-specific and global authentication
+ */
+ protected static AuthProvider resolveAuthProvider(Map<String, String>
taskConfigs) {
+ String explicitToken = taskConfigs.get(MinionConstants.AUTH_TOKEN);
+ if (StringUtils.isNotBlank(explicitToken)) {
+ return AuthProviderUtils.makeAuthProvider(explicitToken);
+ }
+
+ AuthProvider runtimeProvider = MINION_CONTEXT.getTaskAuthProvider();
+ if (runtimeProvider == null || runtimeProvider instanceof
NullAuthProvider) {
+ return new NullAuthProvider();
+ }
+
+ return runtimeProvider;
+ }
+
protected File downloadSegmentToLocalAndUntar(String tableNameWithType,
String segmentName, String deepstoreURL,
String taskType, File tempDataDir, String suffix)
throws Exception {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutorTest.java
index fc3e6e5085b..0abbbf739ea 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutorTest.java
@@ -30,9 +30,12 @@ import org.apache.commons.io.FileUtils;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.NullAuthProvider;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
@@ -52,6 +55,7 @@ public class BaseMultipleSegmentsConversionExecutorTest {
@AfterMethod
public void tearDown() throws IOException {
+ MinionContext.getInstance().setTaskAuthProvider(null);
// Clean up the temporary directory
FileUtils.deleteDirectory(_tempDir);
}
@@ -80,6 +84,35 @@ public class BaseMultipleSegmentsConversionExecutorTest {
};
}
+ // Helper method to create task configs with optional AUTH_TOKEN
+ private Map<String, String> createTaskConfigs(String authToken) {
+ Map<String, String> taskConfigs = new HashMap<>();
+ taskConfigs.put("tableName", "myTable_OFFLINE");
+ taskConfigs.put("uploadURL", "http://controller:9000/upload");
+ if (authToken != null) {
+ taskConfigs.put(MinionConstants.AUTH_TOKEN, authToken);
+ }
+ return taskConfigs;
+ }
+
+ // Helper method to create SegmentUploadContext and get auth provider headers
+ private List<Header> getAuthHeaders(Map<String, String> taskConfigs) {
+ PinotTaskConfig pinotTaskConfig = new PinotTaskConfig("customMinionTask",
taskConfigs);
+ List<SegmentConversionResult> results = new ArrayList<>();
+ BaseMultipleSegmentsConversionExecutor.SegmentUploadContext ctx =
+ new
BaseMultipleSegmentsConversionExecutor.SegmentUploadContext(pinotTaskConfig,
results);
+ return _executor.getSegmentPushCommonHeaders(pinotTaskConfig,
ctx.getAuthProvider(), results);
+ }
+
+ // Helper method to get AuthProvider from task configs
+ private AuthProvider getAuthProviderFromTaskConfigs(Map<String, String>
taskConfigs) {
+ PinotTaskConfig pinotTaskConfig = new PinotTaskConfig("customMinionTask",
taskConfigs);
+ List<SegmentConversionResult> results = new ArrayList<>();
+ BaseMultipleSegmentsConversionExecutor.SegmentUploadContext ctx =
+ new
BaseMultipleSegmentsConversionExecutor.SegmentUploadContext(pinotTaskConfig,
results);
+ return ctx.getAuthProvider();
+ }
+
@Test
public void testGetPushJobSpec() {
Map<String, String> taskConfigs = new HashMap<>();
@@ -98,6 +131,99 @@ public class BaseMultipleSegmentsConversionExecutorTest {
Assert.assertEquals(headers.size(), 1);
}
+ @Test
+ public void testRuntimeAuthProviderUsedWhenNoExplicitToken() {
+ // Set up a runtime provider in MinionContext
+ AuthProvider runtimeProvider = new AuthProvider() {
+ @Override
+ public Map<String, Object> getRequestHeaders() {
+ Map<String, Object> m = new HashMap<>();
+ m.put("X-Runtime-Auth", "runtime-value");
+ return m;
+ }
+ @Override
+ public String getTaskToken() {
+ return "IGNORED";
+ }
+ };
+ MinionContext.getInstance().setTaskAuthProvider(runtimeProvider);
+
+ // Create task WITHOUT explicit AUTH_TOKEN
+ Map<String, String> taskConfigs = createTaskConfigs(null);
+ List<Header> headers = getAuthHeaders(taskConfigs);
+
+ // Should use runtime provider since no explicit token was provided
+ boolean foundCustom = headers.stream().anyMatch(h ->
h.getName().equals("X-Runtime-Auth")
+ && h.getValue().equals("runtime-value"));
+ Assert.assertTrue(foundCustom, "Expected custom header from runtime
provider when no explicit token provided");
+ }
+
+ @Test
+ public void testExplicitTaskTokenTakesPrecedence() {
+ // Set up a runtime provider in MinionContext
+ AuthProvider runtimeProvider = new AuthProvider() {
+ @Override
+ public Map<String, Object> getRequestHeaders() {
+ Map<String, Object> m = new HashMap<>();
+ m.put("X-Runtime-Auth", "should-not-be-used");
+ return m;
+ }
+ @Override
+ public String getTaskToken() {
+ return "IGNORED";
+ }
+ };
+ MinionContext.getInstance().setTaskAuthProvider(runtimeProvider);
+
+ // Create task WITH explicit AUTH_TOKEN (should take precedence)
+ Map<String, String> taskConfigs = createTaskConfigs("Bearer
explicit-task-token");
+ List<Header> headers = getAuthHeaders(taskConfigs);
+
+ // Should use explicit AUTH_TOKEN, not runtime provider
+ boolean foundExplicitToken = headers.stream().anyMatch(h ->
h.getName().equals("Authorization")
+ && h.getValue().equals("Bearer explicit-task-token"));
+ Assert.assertTrue(foundExplicitToken, "Expected explicit AUTH_TOKEN to
take precedence over runtime provider");
+
+ // Should NOT have runtime provider's custom header
+ boolean foundRuntimeHeader = headers.stream().anyMatch(h ->
h.getName().equals("X-Runtime-Auth"));
+ Assert.assertFalse(foundRuntimeHeader, "Runtime provider should not be
used when explicit token is provided");
+ }
+
+ @Test
+ public void testExplicitTokenUsedWhenRuntimeProviderIsNull() {
+ // No runtime provider configured (or NullAuthProvider)
+ MinionContext.getInstance().setTaskAuthProvider(new NullAuthProvider());
+
+ // Create task WITH explicit AUTH_TOKEN
+ Map<String, String> taskConfigs = createTaskConfigs("Bearer
fallback-token");
+ List<Header> headers = getAuthHeaders(taskConfigs);
+
+ // Should use explicit AUTH_TOKEN
+ boolean foundAuth = headers.stream().anyMatch(h ->
h.getName().equals("Authorization")
+ && h.getValue().equals("Bearer fallback-token"));
+ Assert.assertTrue(foundAuth, "Expected explicit AUTH_TOKEN to be used"
+ + " when runtime provider is null/NullAuthProvider");
+ }
+
+ @Test
+ public void testReturnsNullAuthProviderWhenBothTokenAndProviderAbsent() {
+ // No runtime provider configured
+ MinionContext.getInstance().setTaskAuthProvider(null);
+
+ // Create task WITHOUT explicit AUTH_TOKEN
+ Map<String, String> taskConfigs = createTaskConfigs(null);
+ List<Header> headers = getAuthHeaders(taskConfigs);
+
+ // Should return NullAuthProvider (no auth headers)
+ Assert.assertTrue(headers.isEmpty() || headers.stream().noneMatch(h ->
h.getName().equals("Authorization")),
+ "Expected no Authorization header when both explicit token and runtime
provider are absent");
+
+ // Verify the auth provider is indeed a NullAuthProvider
+ AuthProvider resolvedProvider =
getAuthProviderFromTaskConfigs(taskConfigs);
+ Assert.assertTrue(resolvedProvider instanceof NullAuthProvider,
+ "Expected NullAuthProvider when both explicit token and runtime
provider are absent");
+ }
+
@Test
public void testGetSegmentPushCommonParams() {
String tableNameWithType = "myTable_OFFLINE";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]