This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch fixing_minion_task_input_fs_override_issue
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8377385dd6f97e5f65b054e4f7e24bb370f525d5
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Apr 5 13:55:50 2021 -0700

    Pinot Minion SegmentGenerationAndPush task: PinotFS configs inside taskSpec 
is always temporary and has higher priority than default PinotFS created by the 
minion server configs
---
 .../SegmentGenerationAndPushTaskExecutor.java      | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
index ee96ea9..fea83c6 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
@@ -49,6 +49,7 @@ import 
org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -221,29 +222,39 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     return outputSegmentTarURI;
   }
 
-  private PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI 
fileURI) {
+  private PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)
+      throws Exception {
     String fileURIScheme = fileURI.getScheme();
     if (fileURIScheme == null) {
-      fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+      return LOCAL_PINOT_FS;
     }
-    if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) {
-      String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+    // Try to create PinotFS using given Input FileSystem config always
+    String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+    if (fsClass != null) {
+      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
       PinotConfiguration fsProps = 
IngestionConfigUtils.getInputFsProps(taskConfigs);
-      PinotFSFactory.register(fileURIScheme, fsClass, fsProps);
+      pinotFS.init(fsProps);
+      return pinotFS;
     }
+    // Fallback to use the PinotFS created by Minion Server configs
     return PinotFSFactory.create(fileURIScheme);
   }
 
-  private PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI 
fileURI) {
+  private PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI 
fileURI)
+      throws Exception {
     String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
     if (fileURIScheme == null) {
-      fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+      return LOCAL_PINOT_FS;
     }
-    if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) {
-      String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+    // Try to create PinotFS using given Input FileSystem config always
+    String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+    if (fsClass != null) {
+      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
       PinotConfiguration fsProps = 
IngestionConfigUtils.getOutputFsProps(taskConfigs);
-      PinotFSFactory.register(fileURIScheme, fsClass, fsProps);
+      pinotFS.init(fsProps);
+      return pinotFS;
     }
+    // Fallback to use the PinotFS created by Minion Server configs
     return PinotFSFactory.create(fileURIScheme);
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to