This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4b0db5a Fixing minion SegmentGenerationAndPushTask to for task spec
generator using created PinotFS (#6755)
4b0db5a is described below
commit 4b0db5a3cfdfe69551cef235f9d8c25ffe31b254
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Apr 7 16:42:45 2021 -0700
Fixing minion SegmentGenerationAndPushTask to for task spec generator using
created PinotFS (#6755)
---
.../SegmentGenerationAndPushTaskExecutor.java | 57 +++-------------
.../SegmentGenerationAndPushTaskGenerator.java | 13 +---
.../SegmentGenerationAndPushTaskUtils.java | 75 ++++++++++++++++++++++
3 files changed, 87 insertions(+), 58 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
index fea83c6..a1d9a38 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
@@ -36,10 +36,7 @@ import
org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -49,7 +46,6 @@ import
org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
-import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -94,7 +90,6 @@ import org.slf4j.LoggerFactory;
public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentGenerationAndPushTaskExecutor.class);
- private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
private static final int DEFUALT_PUSH_ATTEMPTS = 5;
private static final int DEFAULT_PUSH_PARALLELISM = 1;
private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
@@ -152,11 +147,12 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI))
{
outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
}
- PinotFS outputFileFS = getOutputPinotFS(taskConfigs, outputSegmentDirURI);
+ PinotFS outputFileFS =
SegmentGenerationAndPushTaskUtils.getOutputPinotFS(taskConfigs,
outputSegmentDirURI);
switch
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
case TAR:
try {
- SegmentPushUtils.pushSegments(spec, LOCAL_PINOT_FS,
Arrays.asList(outputSegmentTarURI.toString()));
+ SegmentPushUtils.pushSegments(spec,
SegmentGenerationAndPushTaskUtils.getLocalPinotFs(),
+ Arrays.asList(outputSegmentTarURI.toString()));
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
@@ -211,7 +207,7 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
return localSegmentTarFile.toURI();
}
URI outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
- PinotFS outputFileFS = getOutputPinotFS(taskConfigs, outputSegmentDirURI);
+ PinotFS outputFileFS =
SegmentGenerationAndPushTaskUtils.getOutputPinotFS(taskConfigs,
outputSegmentDirURI);
URI outputSegmentTarURI = URI.create(outputSegmentDirURI +
localSegmentTarFile.getName());
if
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT))
&& outputFileFS
.exists(outputSegmentDirURI)) {
@@ -222,42 +218,6 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
return outputSegmentTarURI;
}
- private PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)
- throws Exception {
- String fileURIScheme = fileURI.getScheme();
- if (fileURIScheme == null) {
- return LOCAL_PINOT_FS;
- }
- // Try to create PinotFS using given Input FileSystem config always
- String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
- if (fsClass != null) {
- PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
- PinotConfiguration fsProps =
IngestionConfigUtils.getInputFsProps(taskConfigs);
- pinotFS.init(fsProps);
- return pinotFS;
- }
- // Fallback to use the PinotFS created by Minion Server configs
- return PinotFSFactory.create(fileURIScheme);
- }
-
- private PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI
fileURI)
- throws Exception {
- String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
- if (fileURIScheme == null) {
- return LOCAL_PINOT_FS;
- }
- // Try to create PinotFS using given Input FileSystem config always
- String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
- if (fsClass != null) {
- PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
- PinotConfiguration fsProps =
IngestionConfigUtils.getOutputFsProps(taskConfigs);
- pinotFS.init(fsProps);
- return pinotFS;
- }
- // Fallback to use the PinotFS created by Minion Server configs
- return PinotFSFactory.create(fileURIScheme);
- }
-
private File tarSegmentDir(SegmentGenerationTaskSpec taskSpec, String
segmentName)
throws IOException {
File localOutputTempDir = new File(taskSpec.getOutputDirectoryPath());
@@ -277,7 +237,7 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
throws Exception {
SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
URI inputFileURI =
URI.create(taskConfigs.get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
- PinotFS inputFileFS = getInputPinotFS(taskConfigs, inputFileURI);
+ PinotFS inputFileFS =
SegmentGenerationAndPushTaskUtils.getInputPinotFS(taskConfigs, inputFileURI);
File localInputTempDir = new File(localTempDir, "input");
FileUtils.forceMkdir(localInputTempDir);
@@ -321,12 +281,13 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
taskSpec.setTableConfig(tableConfig);
taskSpec.setSequenceId(Integer.parseInt(taskConfigs.get(BatchConfigProperties.SEQUENCE_ID)));
if (taskConfigs.containsKey(BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT)) {
-
taskSpec.setFailOnEmptySegment(Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT)));
+ taskSpec
+
.setFailOnEmptySegment(Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT)));
}
SegmentNameGeneratorSpec segmentNameGeneratorSpec = new
SegmentNameGeneratorSpec();
segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
- segmentNameGeneratorSpec.setConfigs(
- IngestionConfigUtils.getConfigMapWithPrefix(taskConfigs,
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX));
+ segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils
+ .getConfigMapWithPrefix(taskConfigs,
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX));
taskSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY,
inputFileURI.toString());
return taskSpec;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskGenerator.java
index eb9e1d0..1b3db08 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskGenerator.java
@@ -45,9 +45,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
-import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -268,14 +266,9 @@ public class SegmentGenerationAndPushTaskGenerator
implements PinotTaskGenerator
}
private List<URI> getInputFilesFromDirectory(Map<String, String>
batchConfigMap, URI inputDirURI,
- Set<String> existingSegmentInputFileURIs) {
- String inputDirURIScheme = inputDirURI.getScheme();
- if (!PinotFSFactory.isSchemeSupported(inputDirURIScheme)) {
- String fsClass =
batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS);
- PinotConfiguration fsProps =
IngestionConfigUtils.getInputFsProps(batchConfigMap);
- PinotFSFactory.register(inputDirURIScheme, fsClass, fsProps);
- }
- PinotFS inputDirFS = PinotFSFactory.create(inputDirURIScheme);
+ Set<String> existingSegmentInputFileURIs)
+ throws Exception {
+ PinotFS inputDirFS =
SegmentGenerationAndPushTaskUtils.getInputPinotFS(batchConfigMap, inputDirURI);
String includeFileNamePattern =
batchConfigMap.get(BatchConfigProperties.INCLUDE_FILE_NAME_PATTERN);
String excludeFileNamePattern =
batchConfigMap.get(BatchConfigProperties.EXCLUDE_FILE_NAME_PATTERN);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskUtils.java
new file mode 100644
index 0000000..bf95143
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskUtils.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.segment_generation_and_push;
+
+import java.net.URI;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+public class SegmentGenerationAndPushTaskUtils {
+
+ private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
+
+ static PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)
+ throws Exception {
+ String fileURIScheme = fileURI.getScheme();
+ if (fileURIScheme == null) {
+ return LOCAL_PINOT_FS;
+ }
+ // Try to create PinotFS using given Input FileSystem config always
+ String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+ if (fsClass != null) {
+ PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
+ PinotConfiguration fsProps =
IngestionConfigUtils.getInputFsProps(taskConfigs);
+ pinotFS.init(fsProps);
+ return pinotFS;
+ }
+ // Fallback to use the PinotFS created by Minion Server configs
+ return PinotFSFactory.create(fileURIScheme);
+ }
+
+ static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
+ throws Exception {
+ String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
+ if (fileURIScheme == null) {
+ return LOCAL_PINOT_FS;
+ }
+ // Try to create PinotFS using given Input FileSystem config always
+ String fsClass = taskConfigs.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+ if (fsClass != null) {
+ PinotFS pinotFS = PluginManager.get().createInstance(fsClass);
+ PinotConfiguration fsProps =
IngestionConfigUtils.getOutputFsProps(taskConfigs);
+ pinotFS.init(fsProps);
+ return pinotFS;
+ }
+ // Fallback to use the PinotFS created by Minion Server configs
+ return PinotFSFactory.create(fileURIScheme);
+ }
+
+ static PinotFS getLocalPinotFs() {
+ return LOCAL_PINOT_FS;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]