[ https://issues.apache.org/jira/browse/PHOENIX-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484961#comment-17484961 ]
ASF GitHub Bot commented on PHOENIX-6622: ----------------------------------------- gokceni commented on a change in pull request #1373: URL: https://github.com/apache/phoenix/pull/1373#discussion_r796096890 ########## File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor.tasks; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.transform.TransformTool; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.task.SystemTaskParams; +import org.apache.phoenix.schema.task.Task; +import org.apache.phoenix.schema.transform.SystemTransformRecord; +import org.apache.phoenix.schema.transform.Transform; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Timestamp; + +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_MONITOR_ENABLED; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_RETRY_COUNT; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_RETRY_COUNT_VALUE; + +/** + * Task runs periodically to monitor and orchestrate ongoing transforms in System.Transform table. + * + */ +public class TransformMonitorTask extends BaseTask { + public static final String DEFAULT = "IndexName"; + + public static final Logger LOGGER = LoggerFactory.getLogger(TransformMonitorTask.class); + + private static boolean isDisabled = false; + + // Called from testong + @VisibleForTesting + public static void disableTransformMonitorTask(boolean disabled) { + isDisabled = disabled; + } + + @Override + public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) { + Configuration conf = HBaseConfiguration.create(env.getConfiguration()); + Configuration configuration = HBaseConfiguration.addHbaseResources(conf); + boolean transformMonitorEnabled = configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, DEFAULT_TRANSFORM_MONITOR_ENABLED); + if (!transformMonitorEnabled || isDisabled) { + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "TransformMonitor is disabled"); + } + + try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)){ + SystemTransformRecord systemTransformRecord = Transform.getTransformRecord(taskRecord.getSchemaName(), + taskRecord.getTableName(), null, taskRecord.getTenantId(), conn); + if (systemTransformRecord == null) { + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, + "No transform record is found"); + } + String tableName = SchemaUtil.getTableName(systemTransformRecord.getSchemaName(), + systemTransformRecord.getLogicalTableName()); + + if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.CREATED.name())) { + LOGGER.info("Transform is created, starting the TransformTool ", tableName); + // Kick a TransformTool run, it will already update transform record status and job id + TransformTool transformTool = TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, false, false); + if (transformTool == null) { + // This is not a map/reduce error. There must be some unexpected issue. So, retrying will not solve the underlying issue. + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "TransformTool run failed. Check the parameters."); + } + } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) { + LOGGER.info("Transform is completed, TransformMonitor is done ", tableName); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, ""); + } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name()) + && !PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType())) { + LOGGER.info("Transform is pending cutover ", tableName); + Transform.doCutover(conn, systemTransformRecord); + + PTable.TransformType partialTransform = PTable.TransformType.getPartialTransform(systemTransformRecord.getTransformType()); + if (partialTransform != null) { + // Update transform to be partial + SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(systemTransformRecord); + builder.setTransformType(partialTransform); + // Decrement retry count since TransformTool will increment it. Should we set it to 0? + builder.setTransformRetryCount(systemTransformRecord.getTransformRetryCount()-1); + Transform.upsertTransform(builder.build(), conn); + + // Fix unverified rows. Running partial transform will make the transform status go back to started + long startFromTs = 0; + if (systemTransformRecord.getTransformLastStateTs() != null) { + startFromTs = systemTransformRecord.getTransformLastStateTs().getTime()-1; + } + TransformTool.runTransformTool(systemTransformRecord, conf, true, startFromTs, null, true, false); + + // In the future, if we are changing the PK structure, we need to run indextools as well + } else { + // No partial transform needed so, we update state of the transform + LOGGER.warn("No partial type of the transform is found. Completing the transform ", tableName); + Transform.updateTransformRecord(conn, systemTransformRecord, PTable.TransformStatus.COMPLETED); + } + } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name()) || + (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name()) + && PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType()))) { + LOGGER.info(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name()) ? + "Transform is started, we will monitor ": "Partial transform is going on, we will monitor" , tableName); + // Monitor the job of transform tool and decide to retry + String jobId = systemTransformRecord.getTransformJobId(); + if (jobId != null) { + Cluster cluster = new Cluster(configuration); + + Job job = cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobId)); + if (job == null) { + LOGGER.warn(String.format("Transform job with Id=%s is not found", jobId)); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "The job cannot be found"); + } + if (job != null && job.isComplete()) { + if (job.isSuccessful()) { + LOGGER.warn("TransformTool job is successful. Transform should have been in a COMPLETED state " + + taskRecord.getTableName()); + } else { + // Retry TransformTool run + int maxRetryCount = configuration.getInt(TRANSFORM_RETRY_COUNT_VALUE, DEFAULT_TRANSFORM_RETRY_COUNT); + if (systemTransformRecord.getTransformRetryCount() < maxRetryCount) { + // Retry count will be incremented in TransformTool + TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, false, true); + } + } + } + } + } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.FAILED.name())) { + String str = "Transform is marked as failed because either TransformTool is run on the foreground and failed " + + "or it is run as async but there is something wrong with the TransformTool parameters"; + LOGGER.error(str); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str); + } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PAUSED.name())) { + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, + "Transform is paused. No need to monitor"); + } else { + String str = "Transform status is not known " + systemTransformRecord.getString(); + LOGGER.error(str); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str); + } + + // Update task status to RETRY so that it is retried + Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(conn) + .setTaskType(taskRecord.getTaskType()) + .setTenantId(taskRecord.getTenantId()) + .setSchemaName(taskRecord.getSchemaName()) + .setTableName(taskRecord.getTableName()) + .setTaskStatus(PTable.TaskStatus.RETRY.toString()) + .setData(taskRecord.getData()) + .setPriority(taskRecord.getPriority()) + .setStartTs(taskRecord.getTimeStamp()) + .setEndTs(null) + .setAccessCheckEnabled(true) + .build()); + return null; + } + catch (Throwable t) { + LOGGER.warn("Exception while running transform monitor task. " + + "It will be retried in the next system task table scan : " + + taskRecord.getSchemaName() + "." + taskRecord.getTableName() + + " with tenant id " + (taskRecord.getTenantId() == null ? " IS NULL" : taskRecord.getTenantId()) + + " and data " + taskRecord.getData(), t); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, t.toString()); + } + } + + public static void addTransformMonitorTask(PhoenixConnection connection, Configuration configuration, SystemTransformRecord systemTransformRecord, + PTable.TaskStatus taskStatus, Timestamp startTimestamp, Timestamp endTimestamp) throws IOException { + boolean transformMonitorEnabled = configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, DEFAULT_TRANSFORM_MONITOR_ENABLED); + if (!transformMonitorEnabled) { + LOGGER.warn("TransformMonitor is not enabled. Monitoring/retrying TransformTool and doing cutover will not be done automatically"); + return; + } + + Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(connection) + .setTaskType(PTable.TaskType.TRANSFORM_MONITOR) + .setTenantId(systemTransformRecord.getTenantId()) + .setSchemaName(systemTransformRecord.getSchemaName()) + .setTableName(systemTransformRecord.getLogicalTableName()) + .setTaskStatus(taskStatus.toString()) + .setStartTs(startTimestamp) + .setEndTs(endTimestamp) + .setAccessCheckEnabled(true) + .build()); + } + + @Override + public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord) + throws Exception { + return null; Review comment: Will add a comment but here is the explanation. IndexRebuildTask uses this function to check if MR job succeeded or not. For this task, we don't need it. The MR job itself updates the task status. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TransformMonitor should orchestrate transform and do retries > ------------------------------------------------------------ > > Key: PHOENIX-6622 > URL: https://issues.apache.org/jira/browse/PHOENIX-6622 > Project: Phoenix > Issue Type: Sub-task > Reporter: Gokcen Iskender > Priority: Major > -- This message was sent by Atlassian Jira (v8.20.1#820001)