This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b0db5a  Fixing minion SegmentGenerationAndPushTask to for task spec 
generator using created PinotFS (#6755)
4b0db5a is described below

commit 4b0db5a3cfdfe69551cef235f9d8c25ffe31b254
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Apr 7 16:42:45 2021 -0700

    Fixing minion SegmentGenerationAndPushTask to for task spec generator using 
created PinotFS (#6755)
---
 .../SegmentGenerationAndPushTaskExecutor.java      | 57 +++-------------
 .../SegmentGenerationAndPushTaskGenerator.java     | 13 +---
 .../SegmentGenerationAndPushTaskUtils.java         | 75 ++++++++++++++++++++++
 3 files changed, 87 insertions(+), 58 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
index fea83c6..a1d9a38 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
@@ -36,10 +36,7 @@ import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
 import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFS;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -49,7 +46,6 @@ import 
org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
-import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -94,7 +90,6 @@ import org.slf4j.LoggerFactory;
 public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGenerationAndPushTaskExecutor.class);
 
-  private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
   private static final int DEFUALT_PUSH_ATTEMPTS = 5;
   private static final int DEFAULT_PUSH_PARALLELISM = 1;
   private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
@@ -152,11 +147,12 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) 
{
       outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
     }
-    PinotFS outputFileFS = getOutputPinotFS(taskConfigs, outputSegmentDirURI);
+    PinotFS outputFileFS = 
SegmentGenerationAndPushTaskUtils.getOutputPinotFS(taskConfigs, 
outputSegmentDirURI);
     switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
       case TAR:
         try {
-          SegmentPushUtils.pushSegments(spec, LOCAL_PINOT_FS, 
Arrays.asList(outputSegmentTarURI.toString()));
+          SegmentPushUtils.pushSegments(spec, 
SegmentGenerationAndPushTaskUtils.getLocalPinotFs(),
+              Arrays.asList(outputSegmentTarURI.toString()));
         } catch (RetriableOperationException | AttemptsExceededException e) {
           throw new RuntimeException(e);
         }
@@ -211,7 +207,7 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
       return localSegmentTarFile.toURI();
     }
     URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
-    PinotFS outputFileFS = getOutputPinotFS(taskConfigs, outputSegmentDirURI);
+    PinotFS outputFileFS = 
SegmentGenerationAndPushTaskUtils.getOutputPinotFS(taskConfigs, 
outputSegmentDirURI);
     URI outputSegmentTarURI = URI.create(outputSegmentDirURI + 
localSegmentTarFile.getName());
     if 
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) 
&& outputFileFS
         .exists(outputSegmentDirURI)) {
@@ -222,42 +218,6 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     return outputSegmentTarURI;
   }
 
-  private PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)
-      throws Exception {
-    String fileURIScheme = fileURI.getScheme();
-    if (fileURIScheme == null) {
-      return LOCAL_PINOT_FS;
-    }
-    // Try to create PinotFS using given Input FileSystem config always
-    String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
-    if (fsClass != null) {
-      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
-      PinotConfiguration fsProps = 
IngestionConfigUtils.getInputFsProps(taskConfigs);
-      pinotFS.init(fsProps);
-      return pinotFS;
-    }
-    // Fallback to use the PinotFS created by Minion Server configs
-    return PinotFSFactory.create(fileURIScheme);
-  }
-
-  private PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI 
fileURI)
-      throws Exception {
-    String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
-    if (fileURIScheme == null) {
-      return LOCAL_PINOT_FS;
-    }
-    // Try to create PinotFS using given Input FileSystem config always
-    String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
-    if (fsClass != null) {
-      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
-      PinotConfiguration fsProps = 
IngestionConfigUtils.getOutputFsProps(taskConfigs);
-      pinotFS.init(fsProps);
-      return pinotFS;
-    }
-    // Fallback to use the PinotFS created by Minion Server configs
-    return PinotFSFactory.create(fileURIScheme);
-  }
-
   private File tarSegmentDir(SegmentGenerationTaskSpec taskSpec, String 
segmentName)
       throws IOException {
     File localOutputTempDir = new File(taskSpec.getOutputDirectoryPath());
@@ -277,7 +237,7 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
       throws Exception {
     SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
     URI inputFileURI = 
URI.create(taskConfigs.get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
-    PinotFS inputFileFS = getInputPinotFS(taskConfigs, inputFileURI);
+    PinotFS inputFileFS = 
SegmentGenerationAndPushTaskUtils.getInputPinotFS(taskConfigs, inputFileURI);
 
     File localInputTempDir = new File(localTempDir, "input");
     FileUtils.forceMkdir(localInputTempDir);
@@ -321,12 +281,13 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     taskSpec.setTableConfig(tableConfig);
     
taskSpec.setSequenceId(Integer.parseInt(taskConfigs.get(BatchConfigProperties.SEQUENCE_ID)));
     if (taskConfigs.containsKey(BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT)) {
-      
taskSpec.setFailOnEmptySegment(Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT)));
+      taskSpec
+          
.setFailOnEmptySegment(Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT)));
     }
     SegmentNameGeneratorSpec segmentNameGeneratorSpec = new 
SegmentNameGeneratorSpec();
     
segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
-    segmentNameGeneratorSpec.setConfigs(
-        IngestionConfigUtils.getConfigMapWithPrefix(taskConfigs, 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX));
+    segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils
+        .getConfigMapWithPrefix(taskConfigs, 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX));
     taskSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
     taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, 
inputFileURI.toString());
     return taskSpec;
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskGenerator.java
index eb9e1d0..1b3db08 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskGenerator.java
@@ -45,9 +45,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
-import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -268,14 +266,9 @@ public class SegmentGenerationAndPushTaskGenerator 
implements PinotTaskGenerator
   }
 
   private List<URI> getInputFilesFromDirectory(Map<String, String> 
batchConfigMap, URI inputDirURI,
-      Set<String> existingSegmentInputFileURIs) {
-    String inputDirURIScheme = inputDirURI.getScheme();
-    if (!PinotFSFactory.isSchemeSupported(inputDirURIScheme)) {
-      String fsClass = 
batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS);
-      PinotConfiguration fsProps = 
IngestionConfigUtils.getInputFsProps(batchConfigMap);
-      PinotFSFactory.register(inputDirURIScheme, fsClass, fsProps);
-    }
-    PinotFS inputDirFS = PinotFSFactory.create(inputDirURIScheme);
+      Set<String> existingSegmentInputFileURIs)
+      throws Exception {
+    PinotFS inputDirFS = 
SegmentGenerationAndPushTaskUtils.getInputPinotFS(batchConfigMap, inputDirURI);
 
     String includeFileNamePattern = 
batchConfigMap.get(BatchConfigProperties.INCLUDE_FILE_NAME_PATTERN);
     String excludeFileNamePattern = 
batchConfigMap.get(BatchConfigProperties.EXCLUDE_FILE_NAME_PATTERN);
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskUtils.java
new file mode 100644
index 0000000..bf95143
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskUtils.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.segment_generation_and_push;
+
+import java.net.URI;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+public class SegmentGenerationAndPushTaskUtils {
+
+  private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
+
+  static PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)
+      throws Exception {
+    String fileURIScheme = fileURI.getScheme();
+    if (fileURIScheme == null) {
+      return LOCAL_PINOT_FS;
+    }
+    // Try to create PinotFS using given Input FileSystem config always
+    String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+    if (fsClass != null) {
+      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
+      PinotConfiguration fsProps = 
IngestionConfigUtils.getInputFsProps(taskConfigs);
+      pinotFS.init(fsProps);
+      return pinotFS;
+    }
+    // Fallback to use the PinotFS created by Minion Server configs
+    return PinotFSFactory.create(fileURIScheme);
+  }
+
+  static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
+      throws Exception {
+    String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
+    if (fileURIScheme == null) {
+      return LOCAL_PINOT_FS;
+    }
+    // Try to create PinotFS using given Input FileSystem config always
+    String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+    if (fsClass != null) {
+      PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
+      PinotConfiguration fsProps = 
IngestionConfigUtils.getOutputFsProps(taskConfigs);
+      pinotFS.init(fsProps);
+      return pinotFS;
+    }
+    // Fallback to use the PinotFS created by Minion Server configs
+    return PinotFSFactory.create(fileURIScheme);
+  }
+
+  static PinotFS getLocalPinotFs() {
+    return LOCAL_PINOT_FS;
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to