This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cba1cb5 Add BaseTaskGenerator to make task timeout and concurrency
configurable (#8028)
cba1cb5 is described below
commit cba1cb5e2bc5e044802d94553391b1b2802a15fe
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jan 14 17:49:19 2022 -0800
Add BaseTaskGenerator to make task timeout and concurrency configurable
(#8028)
Task timeout and concurrency can be configured via the cluster config:
- <TaskType>.timeoutMs
- <TaskType>.numConcurrentTasksPerInstance
---
.../helix/core/minion/PinotTaskManager.java | 7 +++
.../core/minion/generator/BaseTaskGenerator.java | 71 ++++++++++++++++++++++
.../apache/pinot/core/common/MinionConstants.java | 9 +++
.../tests/SimpleMinionClusterIntegrationTest.java | 19 +++++-
.../plugin/minion/tasks/TestTaskGenerator.java | 11 +---
.../ConvertToRawIndexTaskGenerator.java | 12 +---
.../mergerollup/MergeRollupTaskGenerator.java | 16 ++---
.../RealtimeToOfflineSegmentsTaskGenerator.java | 11 +---
.../SegmentGenerationAndPushTaskGenerator.java | 28 +--------
9 files changed, 117 insertions(+), 67 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 5bafa0a..475ffd2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -345,6 +345,13 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
}
/**
+ * Returns the task generator registry.
+ */
+ public TaskGeneratorRegistry getTaskGeneratorRegistry() {
+ return _taskGeneratorRegistry;
+ }
+
+ /**
* Registers a task generator.
* <p>This method can be used to plug in custom task generators.
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
new file mode 100644
index 0000000..4808d97
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import org.apache.helix.task.JobConfig;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base implementation of the {@link PinotTaskGenerator} which reads the
'taskTimeoutMs' and
+ * 'numConcurrentTasksPerInstance' from the cluster config.
+ */
+public abstract class BaseTaskGenerator implements PinotTaskGenerator {
+ protected static final Logger LOGGER =
LoggerFactory.getLogger(BaseTaskGenerator.class);
+
+ protected ClusterInfoAccessor _clusterInfoAccessor;
+
+ @Override
+ public void init(ClusterInfoAccessor clusterInfoAccessor) {
+ _clusterInfoAccessor = clusterInfoAccessor;
+ }
+
+ @Override
+ public long getTaskTimeoutMs() {
+ String taskType = getTaskType();
+ String configKey = taskType + MinionConstants.TIMEOUT_MS_KEY_SUFFIX;
+ String configValue = _clusterInfoAccessor.getClusterConfig(configKey);
+ if (configValue != null) {
+ try {
+ return Long.parseLong(configValue);
+ } catch (Exception e) {
+ LOGGER.error("Invalid cluster config {}: '{}'", configKey,
configValue, e);
+ }
+ }
+ return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
+ }
+
+ @Override
+ public int getNumConcurrentTasksPerInstance() {
+ String taskType = getTaskType();
+ String configKey = taskType +
MinionConstants.NUM_CONCURRENT_TASKS_PER_INSTANCE_KEY_SUFFIX;
+ String configValue = _clusterInfoAccessor.getClusterConfig(configKey);
+ if (configValue != null) {
+ try {
+ return Integer.parseInt(configValue);
+ } catch (Exception e) {
+ LOGGER.error("Invalid config {}: '{}'", configKey, configValue, e);
+ }
+ }
+ return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 0ce9980..ccf1b44 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -47,6 +47,15 @@ public class MinionConstants {
public static final String INITIAL_RETRY_DELAY_MS_KEY =
"initialRetryDelayMs";
public static final String RETRY_SCALE_FACTOR_KEY = "retryScaleFactor";
+ /**
+ * Cluster level configs
+ */
+ public static final String TIMEOUT_MS_KEY_SUFFIX = ".timeoutMs";
+ public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE_KEY_SUFFIX =
".numConcurrentTasksPerInstance";
+
+ /**
+ * Table level configs
+ */
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY =
"enableReplaceSegments";
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 43f803a..d7d1f99 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -21,12 +21,16 @@ package org.apache.pinot.integration.tests;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -70,6 +74,14 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
startController();
startBroker();
startServer();
+ startMinion();
+
+ // Set task timeout in cluster config
+ PinotHelixResourceManager helixResourceManager =
_controllerStarter.getHelixResourceManager();
+ helixResourceManager.getHelixAdmin().setConfig(
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+ helixResourceManager.getHelixClusterName()).build(),
+ Collections.singletonMap(TASK_TYPE +
MinionConstants.TIMEOUT_MS_KEY_SUFFIX, Long.toString(600_000L)));
// Add 3 offline tables, where 2 of them have TestTask enabled
TableTaskConfig taskConfig = new
TableTaskConfig(Collections.singletonMap(TASK_TYPE, Collections.emptyMap()));
@@ -81,8 +93,13 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
_helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
+ }
- startMinion();
+ @Test
+ public void testTaskTimeout() {
+ PinotTaskGenerator taskGenerator =
_taskManager.getTaskGeneratorRegistry().getTaskGenerator(TASK_TYPE);
+ assertNotNull(taskGenerator);
+ assertEquals(taskGenerator.getTaskTimeoutMs(), 600_000L);
}
private void verifyTaskCount(String task, int errors, int waiting, int
running, int total) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java
index 740d3e5..1efa7fb 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java
@@ -23,8 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
-import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
@@ -37,13 +36,7 @@ import static org.testng.Assert.assertEquals;
* Task generator for {@link SimpleMinionClusterIntegrationTest}.
*/
@TaskGenerator
-public class TestTaskGenerator implements PinotTaskGenerator {
- private ClusterInfoAccessor _clusterInfoAccessor;
-
- @Override
- public void init(ClusterInfoAccessor clusterInfoAccessor) {
- _clusterInfoAccessor = clusterInfoAccessor;
- }
+public class TestTaskGenerator extends BaseTaskGenerator {
@Override
public String getTaskType() {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
index 5fc7c64..0d0d769 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskGenerator.java
@@ -29,8 +29,7 @@ import org.apache.pinot.common.data.Segment;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
-import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -43,16 +42,9 @@ import org.slf4j.LoggerFactory;
@TaskGenerator
-public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
+public class ConvertToRawIndexTaskGenerator extends BaseTaskGenerator {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
- private ClusterInfoAccessor _clusterInfoAccessor;
-
- @Override
- public void init(ClusterInfoAccessor clusterInfoAccessor) {
- _clusterInfoAccessor = clusterInfoAccessor;
- }
-
@Override
public String getTaskType() {
return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 1f81011..63139e6 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -39,7 +39,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.common.MinionConstants;
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
* maxNumRecordsPerTask
*/
@TaskGenerator
-public class MergeRollupTaskGenerator implements PinotTaskGenerator {
+public class MergeRollupTaskGenerator extends BaseTaskGenerator {
private static final Logger LOGGER =
LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
@@ -113,17 +113,9 @@ public class MergeRollupTaskGenerator implements
PinotTaskGenerator {
private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS =
"mergeRollupTaskDelayInNumBuckets";
// tableNameWithType -> mergeLevel -> watermarkMs
- private Map<String, Map<String, Long>> _mergeRollupWatermarks;
+ private final Map<String, Map<String, Long>> _mergeRollupWatermarks = new
HashMap<>();
// tableNameWithType -> maxValidBucketEndTime
- private Map<String, Long> _tableMaxValidBucketEndTimeMs;
- private ClusterInfoAccessor _clusterInfoAccessor;
-
- @Override
- public void init(ClusterInfoAccessor clusterInfoAccessor) {
- _clusterInfoAccessor = clusterInfoAccessor;
- _mergeRollupWatermarks = new HashMap<>();
- _tableMaxValidBucketEndTimeMs = new HashMap<>();
- }
+ private final Map<String, Long> _tableMaxValidBucketEndTimeMs = new
HashMap<>();
@Override
public String getTaskType() {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index 7b0b490..c96d5d4 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -31,7 +31,7 @@ import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.common.MinionConstants;
@@ -77,19 +77,12 @@ import org.slf4j.LoggerFactory;
* - A PinotTaskConfig is created, with segment information, execution
window, and any config specific to the task
*/
@TaskGenerator
-public class RealtimeToOfflineSegmentsTaskGenerator implements
PinotTaskGenerator {
+public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
private static final String DEFAULT_BUCKET_PERIOD = "1d";
private static final String DEFAULT_BUFFER_PERIOD = "2d";
- private ClusterInfoAccessor _clusterInfoAccessor;
-
- @Override
- public void init(ClusterInfoAccessor clusterInfoAccessor) {
- _clusterInfoAccessor = clusterInfoAccessor;
- }
-
@Override
public String getTaskType() {
return RealtimeToOfflineSegmentsTask.TASK_TYPE;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
index 16424d1..33dd85a 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
@@ -32,12 +32,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
-import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
@@ -89,39 +87,17 @@ import org.slf4j.LoggerFactory;
*
*/
@TaskGenerator
-public class SegmentGenerationAndPushTaskGenerator implements
PinotTaskGenerator {
+public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class);
private static final BatchConfigProperties.SegmentPushType
DEFAULT_SEGMENT_PUSH_TYPE =
BatchConfigProperties.SegmentPushType.TAR;
- private ClusterInfoAccessor _clusterInfoAccessor;
-
- @Override
- public void init(ClusterInfoAccessor clusterInfoAccessor) {
- _clusterInfoAccessor = clusterInfoAccessor;
- }
-
@Override
public String getTaskType() {
return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
}
@Override
- public int getNumConcurrentTasksPerInstance() {
- String numConcurrentTasksPerInstanceStr = _clusterInfoAccessor
-
.getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE);
- if (numConcurrentTasksPerInstanceStr != null) {
- try {
- return Integer.parseInt(numConcurrentTasksPerInstanceStr);
- } catch (Exception e) {
- LOGGER.error("Failed to parse cluster config: {}",
-
MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE,
e);
- }
- }
- return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
- }
-
- @Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]