[ 
https://issues.apache.org/jira/browse/PHOENIX-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484961#comment-17484961
 ] 

ASF GitHub Bot commented on PHOENIX-6622:
-----------------------------------------

gokceni commented on a change in pull request #1373:
URL: https://github.com/apache/phoenix/pull/1373#discussion_r796096890



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor.tasks;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_MONITOR_ENABLED;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_RETRY_COUNT;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_RETRY_COUNT_VALUE;
+
+/**
+ * Task runs periodically to monitor and orchestrate ongoing transforms in 
System.Transform table.
+ *
+ */
+public class TransformMonitorTask extends BaseTask  {
+    public static final String DEFAULT = "IndexName";
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(TransformMonitorTask.class);
+
+    private static boolean isDisabled = false;
+
+    // Called from testong
+    @VisibleForTesting
+    public static void disableTransformMonitorTask(boolean disabled) {
+        isDisabled = disabled;
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+        Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+        Configuration configuration = 
HBaseConfiguration.addHbaseResources(conf);
+        boolean transformMonitorEnabled = 
configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, 
DEFAULT_TRANSFORM_MONITOR_ENABLED);
+        if (!transformMonitorEnabled || isDisabled) {
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
"TransformMonitor is disabled");
+        }
+
+        try (PhoenixConnection conn = 
QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)){
+            SystemTransformRecord systemTransformRecord = 
Transform.getTransformRecord(taskRecord.getSchemaName(),
+                    taskRecord.getTableName(), null, taskRecord.getTenantId(), 
conn);
+            if (systemTransformRecord == null) {
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+                        "No transform record is found");
+            }
+            String tableName = 
SchemaUtil.getTableName(systemTransformRecord.getSchemaName(),
+                    systemTransformRecord.getLogicalTableName());
+
+            if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.CREATED.name()))
 {
+                LOGGER.info("Transform is created, starting the TransformTool 
", tableName);
+                // Kick a TransformTool run, it will already update transform 
record status and job id
+                TransformTool transformTool = 
TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, 
false, false);
+                if (transformTool == null) {
+                    // This is not a map/reduce error. There must be some 
unexpected issue. So, retrying will not solve the underlying issue.
+                    return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
"TransformTool run failed. Check the parameters.");
+                }
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name()))
 {
+                LOGGER.info("Transform is completed, TransformMonitor is done 
", tableName);
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
+                    && 
!PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType()))
 {
+                LOGGER.info("Transform is pending cutover ", tableName);
+                Transform.doCutover(conn, systemTransformRecord);
+
+                PTable.TransformType partialTransform = 
PTable.TransformType.getPartialTransform(systemTransformRecord.getTransformType());
+                if (partialTransform != null) {
+                    // Update transform to be partial
+                    SystemTransformRecord.SystemTransformBuilder builder = new 
SystemTransformRecord.SystemTransformBuilder(systemTransformRecord);
+                    builder.setTransformType(partialTransform);
+                    // Decrement retry count since TransformTool will 
increment it. Should we set it to 0?
+                    
builder.setTransformRetryCount(systemTransformRecord.getTransformRetryCount()-1);
+                    Transform.upsertTransform(builder.build(), conn);
+
+                    // Fix unverified rows. Running partial transform will 
make the transform status go back to started
+                    long startFromTs = 0;
+                    if (systemTransformRecord.getTransformLastStateTs() != 
null) {
+                        startFromTs = 
systemTransformRecord.getTransformLastStateTs().getTime()-1;
+                    }
+                    TransformTool.runTransformTool(systemTransformRecord, 
conf, true, startFromTs, null, true, false);
+
+                    // In the future, if we are changing the PK structure, we 
need to run indextools as well
+                } else {
+                    // No partial transform needed so, we update state of the 
transform
+                    LOGGER.warn("No partial type of the transform is found. 
Completing the transform ", tableName);
+                    Transform.updateTransformRecord(conn, 
systemTransformRecord, PTable.TransformStatus.COMPLETED);
+                }
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name())
 ||
+                    
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
+                            && 
PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType())))
 {
+                
LOGGER.info(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name())
 ?
+                        "Transform is started, we will monitor ": "Partial 
transform is going on, we will monitor" , tableName);
+                // Monitor the job of transform tool and decide to retry
+                String jobId = systemTransformRecord.getTransformJobId();
+                if (jobId != null) {
+                    Cluster cluster = new Cluster(configuration);
+
+                    Job job = 
cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobId));
+                    if (job == null) {
+                        LOGGER.warn(String.format("Transform job with Id=%s is 
not found", jobId));
+                        return new  
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "The 
job cannot be found");
+                    }
+                    if (job != null && job.isComplete()) {
+                        if (job.isSuccessful()) {
+                            LOGGER.warn("TransformTool job is successful. 
Transform should have been in a COMPLETED state "
+                                    + taskRecord.getTableName());
+                        } else {
+                            // Retry TransformTool run
+                            int maxRetryCount = 
configuration.getInt(TRANSFORM_RETRY_COUNT_VALUE, 
DEFAULT_TRANSFORM_RETRY_COUNT);
+                            if (systemTransformRecord.getTransformRetryCount() 
< maxRetryCount) {
+                                // Retry count will be incremented in 
TransformTool
+                                
TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, 
false, true);
+                            }
+                        }
+                    }
+                }
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.FAILED.name()))
 {
+                String str = "Transform is marked as failed because either 
TransformTool is run on the foreground and failed " +
+                        "or it is run as async but there is something wrong 
with the TransformTool parameters";
+                LOGGER.error(str);
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+            } else if 
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PAUSED.name()))
 {
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS,
+                        "Transform is paused. No need to monitor");
+            } else {
+                String str = "Transform status is not known " + 
systemTransformRecord.getString();
+                LOGGER.error(str);
+                return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+            }
+
+            // Update task status to RETRY so that it is retried
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(conn)
+                    .setTaskType(taskRecord.getTaskType())
+                    .setTenantId(taskRecord.getTenantId())
+                    .setSchemaName(taskRecord.getSchemaName())
+                    .setTableName(taskRecord.getTableName())
+                    .setTaskStatus(PTable.TaskStatus.RETRY.toString())
+                    .setData(taskRecord.getData())
+                    .setPriority(taskRecord.getPriority())
+                    .setStartTs(taskRecord.getTimeStamp())
+                    .setEndTs(null)
+                    .setAccessCheckEnabled(true)
+                    .build());
+            return null;
+        }
+        catch (Throwable t) {
+            LOGGER.warn("Exception while running transform monitor task. " +
+                    "It will be retried in the next system task table scan : " 
+
+                    taskRecord.getSchemaName() + "." + 
taskRecord.getTableName() +
+                    " with tenant id " + (taskRecord.getTenantId() == null ? " 
IS NULL" : taskRecord.getTenantId()) +
+                    " and data " + taskRecord.getData(), t);
+            return new 
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, 
t.toString());
+        }
+    }
+
+    public static void addTransformMonitorTask(PhoenixConnection connection, 
Configuration configuration, SystemTransformRecord systemTransformRecord,
+                                               PTable.TaskStatus taskStatus, 
Timestamp startTimestamp, Timestamp endTimestamp) throws IOException {
+        boolean transformMonitorEnabled = 
configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, 
DEFAULT_TRANSFORM_MONITOR_ENABLED);
+        if (!transformMonitorEnabled) {
+            LOGGER.warn("TransformMonitor is not enabled. Monitoring/retrying 
TransformTool and doing cutover will not be done automatically");
+            return;
+        }
+
+        Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                .setConn(connection)
+                .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+                .setTenantId(systemTransformRecord.getTenantId())
+                .setSchemaName(systemTransformRecord.getSchemaName())
+                .setTableName(systemTransformRecord.getLogicalTableName())
+                .setTaskStatus(taskStatus.toString())
+                .setStartTs(startTimestamp)
+                .setEndTs(endTimestamp)
+                .setAccessCheckEnabled(true)
+                .build());
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord 
taskRecord)
+            throws Exception {
+        return null;

Review comment:
       Will add a comment but here is the explanation. IndexRebuildTask uses 
this function to check if MR job succeeded or not. For this task, we don't need 
it. The MR job itself updates the task status.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TransformMonitor should orchestrate transform and do retries
> ------------------------------------------------------------
>
>                 Key: PHOENIX-6622
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6622
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Gokcen Iskender
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to