This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a67cf87 Pinot Minion SegmentGenerationAndPush task: PinotFS configs
inside taskSpec is always temporary and has higher priority than default
PinotFS created by the minion server configs (#6744)
a67cf87 is described below
commit a67cf873b5b5c390bd516f4d0523529b8ab44993
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Apr 5 21:50:50 2021 -0700
Pinot Minion SegmentGenerationAndPush task: PinotFS configs inside taskSpec
is always temporary and has higher priority than default PinotFS created by the
minion server configs (#6744)
---
.../SegmentGenerationAndPushTaskExecutor.java | 31 +++++++++++++++-------
1 file changed, 21 insertions(+), 10 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
index ee96ea9..fea83c6 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
@@ -49,6 +49,7 @@ import
org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -221,29 +222,39 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
return outputSegmentTarURI;
}
- private PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI
fileURI) {
+ private PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)
+ throws Exception {
String fileURIScheme = fileURI.getScheme();
if (fileURIScheme == null) {
- fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+ return LOCAL_PINOT_FS;
}
- if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) {
- String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+ // Try to create PinotFS using given Input FileSystem config always
+ String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+ if (fsClass != null) {
+ PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
PinotConfiguration fsProps =
IngestionConfigUtils.getInputFsProps(taskConfigs);
- PinotFSFactory.register(fileURIScheme, fsClass, fsProps);
+ pinotFS.init(fsProps);
+ return pinotFS;
}
+ // Fallback to use the PinotFS created by Minion Server configs
return PinotFSFactory.create(fileURIScheme);
}
- private PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI
fileURI) {
+ private PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI
fileURI)
+ throws Exception {
String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
if (fileURIScheme == null) {
- fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+ return LOCAL_PINOT_FS;
}
- if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) {
- String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+ // Try to create PinotFS using given Input FileSystem config always
+ String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+ if (fsClass != null) {
+ PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
PinotConfiguration fsProps =
IngestionConfigUtils.getOutputFsProps(taskConfigs);
- PinotFSFactory.register(fileURIScheme, fsClass, fsProps);
+ pinotFS.init(fsProps);
+ return pinotFS;
}
+ // Fallback to use the PinotFS created by Minion Server configs
return PinotFSFactory.create(fileURIScheme);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]