mcvsubbu commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r503580109
##
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ * - The watermarkMillis is read from the {@link
RealtimeToOfflineSegmentsTaskMetadata} ZNode found at
MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ * In case of cold-start, no ZNode will exist.
+ * A new ZNode will be created, with watermarkMillis as the smallest time
found in the COMPLETED segments (or using start time config)
+ * - The execution window for the task is calculated as, windowStartMillis =
waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ * where bucketTime can be provided in the taskConfigs (default 1d)
+ * - If the execution window is not older than bufferTimeMillis, no task will
be generated,
+ * where bufferTime can be provided in the taskConfigs (default 2d)
+ * - Segment metadata is scanned for all COMPLETED segments, to pick those
containing data in window [windowStartMillis, windowEndMillis)
+ * - A PinotTaskConfig is created, with segment information, execution
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements
PinotTaskGenerator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+ private static final String DEFAULT_BUCKET_PERIOD = "1d";
+ private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+ private final ClusterInfoProvider _clusterInfoProvider;
+
+ public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider
clusterInfoProvider) {
+_clusterInfoProvider = clusterInfoProvider;
+ }
+
+ @Override
+ public String getTaskType() {
+return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+ }
+
+ @Override
+ public List generateTasks(List tableConfigs) {
Review comment:
It will improve readability if you break this method down to call
helpers.
##
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ *