This is an automated email from the ASF dual-hosted git repository. chinmayskulkarni pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new ed7f1a6 PHOENIX-6155 : Provide a coprocessor endpoint to avoid direct upserts into SYSTEM.TASK from the client ed7f1a6 is described below commit ed7f1a6b69d20c87dfbbee97ec8612ccfb866d1b Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Wed Nov 11 17:05:04 2020 +0530 PHOENIX-6155 : Provide a coprocessor endpoint to avoid direct upserts into SYSTEM.TASK from the client Signed-off-by: Chinmay Kulkarni <chinmayskulka...@apache.org> --- .../phoenix/end2end/BackwardCompatibilityIT.java | 70 +- .../end2end/BackwardCompatibilityTestUtil.java | 3 + .../apache/phoenix/end2end/IndexRebuildTaskIT.java | 17 +- .../phoenix/end2end/SystemTablesUpgradeIT.java | 10 +- .../phoenix/end2end/index/IndexMetadataIT.java | 18 +- .../gold_files/gold_query_index_rebuild_async.txt | 23 + .../it/resources/sql_files/index_rebuild_async.sql | 27 + .../sql_files/query_index_rebuild_async.sql | 20 + .../coprocessor/BaseMetaDataEndpointObserver.java | 6 + .../phoenix/coprocessor/MetaDataEndpointImpl.java | 22 +- .../coprocessor/MetaDataEndpointObserver.java | 3 + .../phoenix/coprocessor/MetaDataProtocol.java | 3 +- .../PhoenixMetaDataCoprocessorHost.java | 14 + .../phoenix/coprocessor/TaskMetaDataEndpoint.java | 127 ++++ .../phoenix/coprocessor/TaskRegionObserver.java | 34 +- .../coprocessor/generated/MetaDataProtos.java | 62 +- .../coprocessor/generated/TaskMetaDataProtos.java | 784 +++++++++++++++++++++ .../coprocessor/tasks/IndexRebuildTask.java | 18 +- .../apache/phoenix/exception/SQLExceptionCode.java | 2 + .../org/apache/phoenix/protobuf/ProtobufUtil.java | 7 + .../phoenix/query/ConnectionQueryServicesImpl.java | 20 +- .../org/apache/phoenix/schema/MetaDataClient.java | 35 +- .../phoenix/schema/task/SystemTaskParams.java | 188 +++++ .../java/org/apache/phoenix/schema/task/Task.java | 152 +++- .../phoenix/util/TaskMetaDataServiceCallBack.java | 67 ++ .../coprocessor/TaskMetaDataEndpointTest.java | 186 +++++ phoenix-protocol/src/main/MetaDataService.proto | 1 + .../src/main/TaskMetaDataService.proto | 34 + 28 files changed, 1876 insertions(+), 77 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java index 0061b83..79f7302 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java @@ -21,10 +21,12 @@ import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DATA; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DELETE; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ADD; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DIVERGED_VIEW; +import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.INDEX_REBUILD_ASYNC; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DATA; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DELETE; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_ADD; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_DIVERGED_VIEW; +import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_INDEX_REBUILD_ASYNC; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.assertExpectedOutput; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.checkForPreConditions; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.computeClientVersions; @@ -34,6 +36,9 @@ import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.UpgradePr import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.UpgradeProps.SET_MAX_LOOK_BACK_AGE; import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; @@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.query.QueryServices; @@ -305,19 +311,67 @@ public class BackwardCompatibilityIT { } @Test - public void testUpdatedSplitPolicyForSysTask() throws Exception { - executeQueryWithClientVersion(compatibleClientVersion, CREATE_DIVERGED_VIEW, zkQuorum); - executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE); + public void testSplitPolicyAndCoprocessorForSysTask() throws Exception { + executeQueryWithClientVersion(compatibleClientVersion, + CREATE_DIVERGED_VIEW, zkQuorum); - try (org.apache.hadoop.hbase.client.Connection conn = - hbaseTestUtil.getConnection(); Admin admin = conn.getAdmin()) { + String[] versionArr = compatibleClientVersion.split("\\."); + int majorVersion = Integer.parseInt(versionArr[0]); + int minorVersion = Integer.parseInt(versionArr[1]); + org.apache.hadoop.hbase.client.Connection conn = null; + Admin admin = null; + // if connected with client < 4.15, SYSTEM.TASK does not exist + // if connected with client 4.15, SYSTEM.TASK exists without any + // split policy and also TaskMetaDataEndpoint coprocessor would not + // exist + if (majorVersion == 4 && minorVersion == 15) { + conn = hbaseTestUtil.getConnection(); + admin = conn.getAdmin(); HTableDescriptor tableDescriptor = admin.getTableDescriptor( TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)); - assertEquals("split policy not updated with compatible client version: " + assertNull("split policy should be null with compatible client version: " + + compatibleClientVersion, tableDescriptor.getRegionSplitPolicyClassName()); + assertFalse("Coprocessor " + TaskMetaDataEndpoint.class.getName() + + " should not have been added with compatible client version: " + compatibleClientVersion, - tableDescriptor.getRegionSplitPolicyClassName(), - SystemTaskSplitPolicy.class.getName()); + tableDescriptor.hasCoprocessor(TaskMetaDataEndpoint.class.getName())); + } + + executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE); + + if (conn == null) { + conn = hbaseTestUtil.getConnection(); + admin = conn.getAdmin(); } + // connect with client > 4.15, and we have new split policy and new + // coprocessor loaded + HTableDescriptor tableDescriptor = admin.getTableDescriptor( + TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)); + assertEquals("split policy not updated with compatible client version: " + + compatibleClientVersion, + tableDescriptor.getRegionSplitPolicyClassName(), + SystemTaskSplitPolicy.class.getName()); + assertTrue("Coprocessor " + TaskMetaDataEndpoint.class.getName() + + " has not been added with compatible client version: " + + compatibleClientVersion, tableDescriptor.hasCoprocessor( + TaskMetaDataEndpoint.class.getName())); assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW); + admin.close(); + conn.close(); } + + @Test + public void testSystemTaskCreationWithIndexAsyncRebuild() throws Exception { + String[] versionArr = compatibleClientVersion.split("\\."); + int majorVersion = Integer.parseInt(versionArr[0]); + int minorVersion = Integer.parseInt(versionArr[1]); + // index async rebuild support min version check + if (majorVersion > 4 || (majorVersion == 4 && minorVersion >= 15)) { + executeQueryWithClientVersion(compatibleClientVersion, + INDEX_REBUILD_ASYNC, zkQuorum); + executeQueriesWithCurrentVersion(QUERY_INDEX_REBUILD_ASYNC, url, NONE); + assertExpectedOutput(QUERY_INDEX_REBUILD_ASYNC); + } + } + } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java index c79e19b..e5a7d3d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java @@ -84,6 +84,9 @@ public final class BackwardCompatibilityTestUtil { public static final String QUERY_DELETE = QUERY_PREFIX + DELETE; public static final String QUERY_SELECT_AND_DROP_TABLE = QUERY_PREFIX + SELECT_AND_DROP_TABLE; public static final String QUERY_CREATE_DIVERGED_VIEW = QUERY_PREFIX + CREATE_DIVERGED_VIEW; + public static final String INDEX_REBUILD_ASYNC = "index_rebuild_async"; + public static final String QUERY_INDEX_REBUILD_ASYNC = QUERY_PREFIX + + INDEX_REBUILD_ASYNC; public static final String MVN_HOME = "maven.home"; public static final String JAVA_TMP_DIR = "java.io.tmpdir"; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java index 9d97cc5..71f5ae4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java @@ -29,6 +29,7 @@ import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.task.SystemTaskParams; import org.apache.phoenix.schema.task.Task; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.MetaDataUtil; @@ -147,9 +148,19 @@ public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT { TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS); Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); - Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD, - TENANT1, null, viewName, - PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true); + Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(conn.unwrap(PhoenixConnection.class)) + .setTaskType(PTable.TaskType.INDEX_REBUILD) + .setTenantId(TENANT1) + .setSchemaName(null) + .setTableName(viewName) + .setTaskStatus(PTable.TaskStatus.CREATED.toString()) + .setData(data) + .setPriority(null) + .setStartTs(startTs) + .setEndTs(null) + .setAccessCheckEnabled(true) + .build()); task.run(); // Check task status and other column values. diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java index e38c5e6..503ede4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java @@ -25,9 +25,11 @@ import java.sql.SQLException; import java.util.Map; import java.util.Properties; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; @@ -127,12 +129,12 @@ public class SystemTablesUpgradeIT extends BaseTest { // SystemTaskSplitPolicy (which is extending DisabledRegionSplitPolicy // as of this writing) try (Admin admin = services.getAdmin()) { - String taskSplitPolicy = admin - .getTableDescriptor(TableName.valueOf( - PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)) - .getRegionSplitPolicyClassName(); + HTableDescriptor td = admin.getTableDescriptor(TableName.valueOf( + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)); + String taskSplitPolicy = td.getRegionSplitPolicyClassName(); assertEquals(SystemTaskSplitPolicy.class.getName(), taskSplitPolicy); + assertTrue(td.hasCoprocessor(TaskMetaDataEndpoint.class.getName())); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java index 6352d35..00d9c8c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java @@ -25,6 +25,7 @@ import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_UPDATE_CACHE_FREQUENCY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -665,9 +666,20 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT { conn.createStatement().execute( "ALTER INDEX " + indexName + " ON " + testTable + " REBUILD ALL ASYNC"); - String - queryTaskTable = - "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME; + ResultSet resultSet = conn.createStatement().executeQuery( + "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME); + assertTrue(resultSet.next()); + assertEquals("2", resultSet.getString(1)); + assertNull(resultSet.getString(3)); + assertNull(resultSet.getString(4)); + assertEquals(testTable, resultSet.getString(5)); + assertEquals("CREATED", resultSet.getString(6)); + assertEquals("4", resultSet.getString(8)); + assertEquals( + "{\"IndexName\":\"" + indexName + "\",\"RebuildAll\":true}", + resultSet.getString(9)); + String queryTaskTable = + "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME; ResultSet rs = conn.createStatement().executeQuery(queryTaskTable); assertTrue(rs.next()); assertEquals(testTable, rs.getString(TABLE_NAME)); diff --git a/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt b/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt new file mode 100644 index 0000000..ff020c5 --- /dev/null +++ b/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'TASK_TYPE','TABLE_NAME','TASK_STATUS','TASK_PRIORITY' +'2','TI','COMPLETED','4' +'K','V' +'key1','val2' +'key3','val3' diff --git a/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql b/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql new file mode 100644 index 0000000..17133c0 --- /dev/null +++ b/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS S.TI (K VARCHAR PRIMARY KEY, V VARCHAR); +UPSERT INTO S.TI VALUES ('key1', 'val1'); +CREATE INDEX R_ASYNCIND_TI ON S.TI (K, V); +ALTER INDEX R_ASYNCIND_TI ON S.TI DISABLE; +UPSERT INTO S.TI VALUES ('key1', 'val2'); +ALTER INDEX R_ASYNCIND_TI ON S.TI REBUILD ALL ASYNC; +UPSERT INTO S.TI VALUES ('key3', 'val3'); +UPSERT INTO S.TI VALUES ('key4', 'val4'); +DELETE FROM S.TI WHERE K = 'key4'; diff --git a/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql b/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql new file mode 100644 index 0000000..46d073e --- /dev/null +++ b/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +SELECT TASK_TYPE, TABLE_NAME, TASK_STATUS, TASK_PRIORITY FROM SYSTEM.TASK; +SELECT * FROM S.TI; \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java index 1f9e29b..b2ddf3b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java @@ -113,4 +113,10 @@ public class BaseMetaDataEndpointObserver implements MetaDataEndpointObserver{ public void preCreateViewAddChildLink( final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tableName) throws IOException {} + + @Override + public void preUpsertTaskDetails( + final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, + final String tableName) throws IOException { + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index fde1aea..fc3ddb2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -210,6 +210,7 @@ import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.SequenceNotFoundException; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.task.SystemTaskParams; import org.apache.phoenix.schema.task.Task; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PBoolean; @@ -2274,13 +2275,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()) .unwrap(PhoenixConnection.class)) { - Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS, - Bytes.toString(tenantIdBytes), - Bytes.toString(schemaName), - Bytes.toString(tableOrViewName), - PTable.TaskStatus.CREATED.toString(), - null, null, null, null, - this.accessCheckEnabled); + Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(conn) + .setTaskType(PTable.TaskType.DROP_CHILD_VIEWS) + .setTenantId(Bytes.toString(tenantIdBytes)) + .setSchemaName(Bytes.toString(schemaName)) + .setTableName(Bytes.toString(tableOrViewName)) + .setTaskStatus( + PTable.TaskStatus.CREATED.toString()) + .setData(null) + .setPriority(null) + .setStartTs(null) + .setEndTs(null) + .setAccessCheckEnabled(this.accessCheckEnabled) + .build()); } catch (Throwable t) { LOGGER.error("Adding a task to drop child views failed!", t); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java index 629f00b..d569c29 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java @@ -68,4 +68,7 @@ public interface MetaDataEndpointObserver extends Coprocessor { void preCreateViewAddChildLink(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tableName) throws IOException; + void preUpsertTaskDetails( + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, + String tableName) throws IOException; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 5d0fc85..e6f0bb5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -178,8 +178,9 @@ public abstract class MetaDataProtocol extends MetaDataService { UNABLE_TO_CREATE_CHILD_LINK, UNABLE_TO_UPDATE_PARENT_TABLE, UNABLE_TO_DELETE_CHILD_LINK, + UNABLE_TO_UPSERT_TASK, NO_OP - }; + } public static class SharedTableState { private PName tenantId; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java index f7ff05c..ed2c6c3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java @@ -264,4 +264,18 @@ public class PhoenixMetaDataCoprocessorHost } return user; } + + void preUpsertTaskDetails(final String tableName) throws IOException { + execOperation( + new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>( + getActiveUser()) { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) + throws IOException { + observer.preUpsertTaskDetails(ctx, tableName); + } + }); + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java new file mode 100644 index 0000000..60b8f6b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.coprocessor; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; +import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; +import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos + .TaskMetaDataService; +import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos + .TaskMutateRequest; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.protobuf.ProtobufUtil; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl + .mutateRowsWithLocks; + +/** + * Phoenix metadata mutations for SYSTEM.TASK flows through this co-processor + * Endpoint. + */ +public class TaskMetaDataEndpoint extends TaskMetaDataService + implements CoprocessorService, Coprocessor { + + private static final Logger LOGGER = + LoggerFactory.getLogger(TaskMetaDataEndpoint.class); + + private RegionCoprocessorEnvironment env; + private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost; + private boolean accessCheckEnabled; + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment) env; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + this.phoenixAccessCoprocessorHost = + new PhoenixMetaDataCoprocessorHost(this.env); + this.accessCheckEnabled = env.getConfiguration().getBoolean( + QueryServices.PHOENIX_ACLS_ENABLED, + QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // no-op + } + + @Override + public Service getService() { + return this; + } + + @Override + public void upsertTaskDetails(RpcController controller, + TaskMutateRequest request, RpcCallback<MetaDataResponse> done) { + MetaDataResponse.Builder builder = MetaDataResponse.newBuilder(); + try { + List<Mutation> taskMutations = ProtobufUtil.getMutations(request); + if (taskMutations.isEmpty()) { + done.run(builder.build()); + return; + } + byte[][] rowKeyMetaData = new byte[3][]; + MetaDataUtil.getTenantIdAndSchemaAndTableName(taskMutations, + rowKeyMetaData); + byte[] schemaName = + rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; + byte[] tableName = + rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; + String fullTableName = SchemaUtil.getTableName(schemaName, + tableName); + + phoenixAccessCoprocessorHost.preUpsertTaskDetails(fullTableName); + + mutateRowsWithLocks(this.accessCheckEnabled, this.env.getRegion(), + taskMutations, Collections.<byte[]>emptySet(), + HConstants.NO_NONCE, HConstants.NO_NONCE); + } catch (Throwable t) { + LOGGER.error("Unable to write mutations to {}", + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME, t); + builder.setReturnCode( + MetaDataProtos.MutationCode.UNABLE_TO_UPSERT_TASK); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + } + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java index e2cc782..3a99d6b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java @@ -32,6 +32,7 @@ import javax.annotation.concurrent.GuardedBy; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.phoenix.schema.task.SystemTaskParams; import org.apache.phoenix.util.JacksonUtil; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -195,10 +196,19 @@ public class TaskRegionObserver extends BaseRegionObserver { } // Change task status to STARTED - Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(), - taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(), - taskRecord.getData(), taskRecord.getPriority(), taskRecord.getTimeStamp(), null, - true); + Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(connForTask) + .setTaskType(taskRecord.getTaskType()) + .setTenantId(taskRecord.getTenantId()) + .setSchemaName(taskRecord.getSchemaName()) + .setTableName(taskRecord.getTableName()) + .setTaskStatus(PTable.TaskStatus.STARTED.toString()) + .setData(taskRecord.getData()) + .setPriority(taskRecord.getPriority()) + .setStartTs(taskRecord.getTimeStamp()) + .setEndTs(null) + .setAccessCheckEnabled(true) + .build()); // invokes the method at runtime result = (TaskResult) runMethod.invoke(obj, taskRecord); @@ -251,9 +261,19 @@ public class TaskRegionObserver extends BaseRegionObserver { data = jsonNode.toString(); Timestamp endTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); - Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(), - taskRecord.getTableName(), taskStatus, data, taskRecord.getPriority(), - taskRecord.getTimeStamp(), endTs, true); + Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(connForTask) + .setTaskType(taskRecord.getTaskType()) + .setTenantId(taskRecord.getTenantId()) + .setSchemaName(taskRecord.getSchemaName()) + .setTableName(taskRecord.getTableName()) + .setTaskStatus(taskStatus) + .setData(data) + .setPriority(taskRecord.getPriority()) + .setStartTs(taskRecord.getTimeStamp()) + .setEndTs(endTs) + .setAccessCheckEnabled(true) + .build()); } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java index 28b58c6..38283a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java @@ -117,6 +117,10 @@ public final class MetaDataProtos { * <code>UNABLE_TO_DELETE_CHILD_LINK = 25;</code> */ UNABLE_TO_DELETE_CHILD_LINK(25, 25), + /** + * <code>UNABLE_TO_UPSERT_TASK = 26;</code> + */ + UNABLE_TO_UPSERT_TASK(26, 26), ; /** @@ -223,6 +227,10 @@ public final class MetaDataProtos { * <code>UNABLE_TO_DELETE_CHILD_LINK = 25;</code> */ public static final int UNABLE_TO_DELETE_CHILD_LINK_VALUE = 25; + /** + * <code>UNABLE_TO_UPSERT_TASK = 26;</code> + */ + public static final int UNABLE_TO_UPSERT_TASK_VALUE = 26; public final int getNumber() { return value; } @@ -255,6 +263,7 @@ public final class MetaDataProtos { case 23: return UNABLE_TO_CREATE_CHILD_LINK; case 24: return UNABLE_TO_UPDATE_PARENT_TABLE; case 25: return UNABLE_TO_DELETE_CHILD_LINK; + case 26: return UNABLE_TO_UPSERT_TASK; default: return null; } } @@ -18048,7 +18057,7 @@ public final class MetaDataProtos { "cheRequest\022\020\n\010tenantId\030\001 \002(\014\022\022\n\nschemaNa" + "me\030\002 \002(\014\022\021\n\ttableName\030\003 \002(\014\022\027\n\017clientTim" + "estamp\030\004 \002(\003\022\025\n\rclientVersion\030\005 \001(\005\"\035\n\033C" + - "learTableFromCacheResponse*\332\005\n\014MutationC" + + "learTableFromCacheResponse*\365\005\n\014MutationC" + "ode\022\030\n\024TABLE_ALREADY_EXISTS\020\000\022\023\n\017TABLE_N" + "OT_FOUND\020\001\022\024\n\020COLUMN_NOT_FOUND\020\002\022\031\n\025COLU", "MN_ALREADY_EXISTS\020\003\022\035\n\031CONCURRENT_TABLE_" + @@ -18066,31 +18075,32 @@ public final class MetaDataProtos { "#\n\037CANNOT_COERCE_AUTO_PARTITION_ID\020\025\022\024\n\020" + "TOO_MANY_INDEXES\020\026\022\037\n\033UNABLE_TO_CREATE_C" + "HILD_LINK\020\027\022!\n\035UNABLE_TO_UPDATE_PARENT_T" + - "ABLE\020\030\022\037\n\033UNABLE_TO_DELETE_CHILD_LINK\020\0312" + - "\345\006\n\017MetaDataService\022/\n\010getTable\022\020.GetTab" + - "leRequest\032\021.MetaDataResponse\0227\n\014getFunct" + - "ions\022\024.GetFunctionsRequest\032\021.MetaDataRes" + - "ponse\0221\n\tgetSchema\022\021.GetSchemaRequest\032\021.", - "MetaDataResponse\0225\n\013createTable\022\023.Create" + - "TableRequest\032\021.MetaDataResponse\022;\n\016creat" + - "eFunction\022\026.CreateFunctionRequest\032\021.Meta" + - "DataResponse\0227\n\014createSchema\022\024.CreateSch" + - "emaRequest\032\021.MetaDataResponse\0221\n\tdropTab" + - "le\022\021.DropTableRequest\032\021.MetaDataResponse" + - "\0223\n\ndropSchema\022\022.DropSchemaRequest\032\021.Met" + - "aDataResponse\0227\n\014dropFunction\022\024.DropFunc" + - "tionRequest\032\021.MetaDataResponse\0221\n\taddCol" + - "umn\022\021.AddColumnRequest\032\021.MetaDataRespons", - "e\0223\n\ndropColumn\022\022.DropColumnRequest\032\021.Me" + - "taDataResponse\022?\n\020updateIndexState\022\030.Upd" + - "ateIndexStateRequest\032\021.MetaDataResponse\022" + - "5\n\nclearCache\022\022.ClearCacheRequest\032\023.Clea" + - "rCacheResponse\0225\n\ngetVersion\022\022.GetVersio" + - "nRequest\032\023.GetVersionResponse\022P\n\023clearTa" + - "bleFromCache\022\033.ClearTableFromCacheReques" + - "t\032\034.ClearTableFromCacheResponseBB\n(org.a" + - "pache.phoenix.coprocessor.generatedB\016Met" + - "aDataProtosH\001\210\001\001\240\001\001" + "ABLE\020\030\022\037\n\033UNABLE_TO_DELETE_CHILD_LINK\020\031\022" + + "\031\n\025UNABLE_TO_UPSERT_TASK\020\0322\345\006\n\017MetaDataS" + + "ervice\022/\n\010getTable\022\020.GetTableRequest\032\021.M" + + "etaDataResponse\0227\n\014getFunctions\022\024.GetFun" + + "ctionsRequest\032\021.MetaDataResponse\0221\n\tgetS", + "chema\022\021.GetSchemaRequest\032\021.MetaDataRespo" + + "nse\0225\n\013createTable\022\023.CreateTableRequest\032" + + "\021.MetaDataResponse\022;\n\016createFunction\022\026.C" + + "reateFunctionRequest\032\021.MetaDataResponse\022" + + "7\n\014createSchema\022\024.CreateSchemaRequest\032\021." + + "MetaDataResponse\0221\n\tdropTable\022\021.DropTabl" + + "eRequest\032\021.MetaDataResponse\0223\n\ndropSchem" + + "a\022\022.DropSchemaRequest\032\021.MetaDataResponse" + + "\0227\n\014dropFunction\022\024.DropFunctionRequest\032\021" + + ".MetaDataResponse\0221\n\taddColumn\022\021.AddColu", + "mnRequest\032\021.MetaDataResponse\0223\n\ndropColu" + + "mn\022\022.DropColumnRequest\032\021.MetaDataRespons" + + "e\022?\n\020updateIndexState\022\030.UpdateIndexState" + + "Request\032\021.MetaDataResponse\0225\n\nclearCache" + + "\022\022.ClearCacheRequest\032\023.ClearCacheRespons" + + "e\0225\n\ngetVersion\022\022.GetVersionRequest\032\023.Ge" + + "tVersionResponse\022P\n\023clearTableFromCache\022" + + "\033.ClearTableFromCacheRequest\032\034.ClearTabl" + + "eFromCacheResponseBB\n(org.apache.phoenix" + + ".coprocessor.generatedB\016MetaDataProtosH\001", + "\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java new file mode 100644 index 0000000..78c1f70 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java @@ -0,0 +1,784 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: TaskMetaDataService.proto + +package org.apache.phoenix.coprocessor.generated; + +public final class TaskMetaDataProtos { + private TaskMetaDataProtos() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface TaskMutateRequestOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // repeated bytes tableMetadataMutations = 1; + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + java.util.List<com.google.protobuf.ByteString> getTableMetadataMutationsList(); + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + int getTableMetadataMutationsCount(); + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + com.google.protobuf.ByteString getTableMetadataMutations(int index); + } + /** + * Protobuf type {@code TaskMutateRequest} + */ + public static final class TaskMutateRequest extends + com.google.protobuf.GeneratedMessage + implements TaskMutateRequestOrBuilder { + // Use TaskMutateRequest.newBuilder() to construct. + private TaskMutateRequest(com.google.protobuf.GeneratedMessage.Builder<?> builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private TaskMutateRequest(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final TaskMutateRequest defaultInstance; + public static TaskMutateRequest getDefaultInstance() { + return defaultInstance; + } + + public TaskMutateRequest getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private TaskMutateRequest( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + tableMetadataMutations_ = new java.util.ArrayList<com.google.protobuf.ByteString>(); + mutable_bitField0_ |= 0x00000001; + } + tableMetadataMutations_.add(input.readBytes()); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + tableMetadataMutations_ = java.util.Collections.unmodifiableList(tableMetadataMutations_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.class, org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.Builder.class); + } + + public static com.google.protobuf.Parser<TaskMutateRequest> PARSER = + new com.google.protobuf.AbstractParser<TaskMutateRequest>() { + public TaskMutateRequest parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new TaskMutateRequest(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser<TaskMutateRequest> getParserForType() { + return PARSER; + } + + // repeated bytes tableMetadataMutations = 1; + public static final int TABLEMETADATAMUTATIONS_FIELD_NUMBER = 1; + private java.util.List<com.google.protobuf.ByteString> tableMetadataMutations_; + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public java.util.List<com.google.protobuf.ByteString> + getTableMetadataMutationsList() { + return tableMetadataMutations_; + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public int getTableMetadataMutationsCount() { + return tableMetadataMutations_.size(); + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public com.google.protobuf.ByteString getTableMetadataMutations(int index) { + return tableMetadataMutations_.get(index); + } + + private void initFields() { + tableMetadataMutations_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + for (int i = 0; i < tableMetadataMutations_.size(); i++) { + output.writeBytes(1, tableMetadataMutations_.get(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + { + int dataSize = 0; + for (int i = 0; i < tableMetadataMutations_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(tableMetadataMutations_.get(i)); + } + size += dataSize; + size += 1 * getTableMetadataMutationsList().size(); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)) { + return super.equals(obj); + } + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest other = (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) obj; + + boolean result = true; + result = result && getTableMetadataMutationsList() + .equals(other.getTableMetadataMutationsList()); + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (getTableMetadataMutationsCount() > 0) { + hash = (37 * hash) + TABLEMETADATAMUTATIONS_FIELD_NUMBER; + hash = (53 * hash) + getTableMetadataMutationsList().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code TaskMutateRequest} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder<Builder> + implements org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequestOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.class, org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.Builder.class); + } + + // Construct using org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + tableMetadataMutations_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor; + } + + public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest getDefaultInstanceForType() { + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance(); + } + + public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest build() { + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest buildPartial() { + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest result = new org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest(this); + int from_bitField0_ = bitField0_; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + tableMetadataMutations_ = java.util.Collections.unmodifiableList(tableMetadataMutations_); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.tableMetadataMutations_ = tableMetadataMutations_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) { + return mergeFrom((org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest other) { + if (other == org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance()) return this; + if (!other.tableMetadataMutations_.isEmpty()) { + if (tableMetadataMutations_.isEmpty()) { + tableMetadataMutations_ = other.tableMetadataMutations_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureTableMetadataMutationsIsMutable(); + tableMetadataMutations_.addAll(other.tableMetadataMutations_); + } + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // repeated bytes tableMetadataMutations = 1; + private java.util.List<com.google.protobuf.ByteString> tableMetadataMutations_ = java.util.Collections.emptyList(); + private void ensureTableMetadataMutationsIsMutable() { + if (!((bitField0_ & 0x00000001) == 0x00000001)) { + tableMetadataMutations_ = new java.util.ArrayList<com.google.protobuf.ByteString>(tableMetadataMutations_); + bitField0_ |= 0x00000001; + } + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public java.util.List<com.google.protobuf.ByteString> + getTableMetadataMutationsList() { + return java.util.Collections.unmodifiableList(tableMetadataMutations_); + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public int getTableMetadataMutationsCount() { + return tableMetadataMutations_.size(); + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public com.google.protobuf.ByteString getTableMetadataMutations(int index) { + return tableMetadataMutations_.get(index); + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public Builder setTableMetadataMutations( + int index, com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureTableMetadataMutationsIsMutable(); + tableMetadataMutations_.set(index, value); + onChanged(); + return this; + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public Builder addTableMetadataMutations(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureTableMetadataMutationsIsMutable(); + tableMetadataMutations_.add(value); + onChanged(); + return this; + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public Builder addAllTableMetadataMutations( + java.lang.Iterable<? extends com.google.protobuf.ByteString> values) { + ensureTableMetadataMutationsIsMutable(); + super.addAll(values, tableMetadataMutations_); + onChanged(); + return this; + } + /** + * <code>repeated bytes tableMetadataMutations = 1;</code> + */ + public Builder clearTableMetadataMutations() { + tableMetadataMutations_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:TaskMutateRequest) + } + + static { + defaultInstance = new TaskMutateRequest(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:TaskMutateRequest) + } + + /** + * Protobuf service {@code TaskMetaDataService} + */ + public static abstract class TaskMetaDataService + implements com.google.protobuf.Service { + protected TaskMetaDataService() {} + + public interface Interface { + /** + * <code>rpc upsertTaskDetails(.TaskMutateRequest) returns (.MetaDataResponse);</code> + */ + public abstract void upsertTaskDetails( + com.google.protobuf.RpcController controller, + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request, + com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done); + + } + + public static com.google.protobuf.Service newReflectiveService( + final Interface impl) { + return new TaskMetaDataService() { + @java.lang.Override + public void upsertTaskDetails( + com.google.protobuf.RpcController controller, + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request, + com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done) { + impl.upsertTaskDetails(controller, request, done); + } + + }; + } + + public static com.google.protobuf.BlockingService + newReflectiveBlockingService(final BlockingInterface impl) { + return new com.google.protobuf.BlockingService() { + public final com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptorForType() { + return getDescriptor(); + } + + public final com.google.protobuf.Message callBlockingMethod( + com.google.protobuf.Descriptors.MethodDescriptor method, + com.google.protobuf.RpcController controller, + com.google.protobuf.Message request) + throws com.google.protobuf.ServiceException { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.callBlockingMethod() given method descriptor for " + + "wrong service type."); + } + switch(method.getIndex()) { + case 0: + return impl.upsertTaskDetails(controller, (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)request); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getRequestPrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getRequestPrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getResponsePrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getResponsePrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + }; + } + + /** + * <code>rpc upsertTaskDetails(.TaskMutateRequest) returns (.MetaDataResponse);</code> + */ + public abstract void upsertTaskDetails( + com.google.protobuf.RpcController controller, + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request, + com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done); + + public static final + com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptor() { + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.getDescriptor().getServices().get(0); + } + public final com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptorForType() { + return getDescriptor(); + } + + public final void callMethod( + com.google.protobuf.Descriptors.MethodDescriptor method, + com.google.protobuf.RpcController controller, + com.google.protobuf.Message request, + com.google.protobuf.RpcCallback< + com.google.protobuf.Message> done) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.callMethod() given method descriptor for wrong " + + "service type."); + } + switch(method.getIndex()) { + case 0: + this.upsertTaskDetails(controller, (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)request, + com.google.protobuf.RpcUtil.<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse>specializeCallback( + done)); + return; + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getRequestPrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getRequestPrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getResponsePrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getResponsePrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public static Stub newStub( + com.google.protobuf.RpcChannel channel) { + return new Stub(channel); + } + + public static final class Stub extends org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMetaDataService implements Interface { + private Stub(com.google.protobuf.RpcChannel channel) { + this.channel = channel; + } + + private final com.google.protobuf.RpcChannel channel; + + public com.google.protobuf.RpcChannel getChannel() { + return channel; + } + + public void upsertTaskDetails( + com.google.protobuf.RpcController controller, + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request, + com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done) { + channel.callMethod( + getDescriptor().getMethods().get(0), + controller, + request, + org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance(), + com.google.protobuf.RpcUtil.generalizeCallback( + done, + org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.class, + org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance())); + } + } + + public static BlockingInterface newBlockingStub( + com.google.protobuf.BlockingRpcChannel channel) { + return new BlockingStub(channel); + } + + public interface BlockingInterface { + public org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse upsertTaskDetails( + com.google.protobuf.RpcController controller, + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request) + throws com.google.protobuf.ServiceException; + } + + private static final class BlockingStub implements BlockingInterface { + private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { + this.channel = channel; + } + + private final com.google.protobuf.BlockingRpcChannel channel; + + public org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse upsertTaskDetails( + com.google.protobuf.RpcController controller, + org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request) + throws com.google.protobuf.ServiceException { + return (org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse) channel.callBlockingMethod( + getDescriptor().getMethods().get(0), + controller, + request, + org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance()); + } + + } + + // @@protoc_insertion_point(class_scope:TaskMetaDataService) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_TaskMutateRequest_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_TaskMutateRequest_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\031TaskMetaDataService.proto\032\025MetaDataSer" + + "vice.proto\"3\n\021TaskMutateRequest\022\036\n\026table" + + "MetadataMutations\030\001 \003(\0142Q\n\023TaskMetaDataS" + + "ervice\022:\n\021upsertTaskDetails\022\022.TaskMutate" + + "Request\032\021.MetaDataResponseBF\n(org.apache" + + ".phoenix.coprocessor.generatedB\022TaskMeta" + + "DataProtosH\001\210\001\001\240\001\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_TaskMutateRequest_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_TaskMutateRequest_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_TaskMutateRequest_descriptor, + new java.lang.String[] { "TableMetadataMutations", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + org.apache.phoenix.coprocessor.generated.MetaDataProtos.getDescriptor(), + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java index 4e3b639..b4c99ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java @@ -21,6 +21,7 @@ import com.google.common.base.Strings; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.phoenix.schema.task.SystemTaskParams; import org.apache.phoenix.util.JacksonUtil; import org.apache.hadoop.conf.Configuration; @@ -106,10 +107,19 @@ public class IndexRebuildTask extends BaseTask { Job job = indexToolRes.getValue(); ((ObjectNode) jsonNode).put(JOB_ID, job.getJobID().toString()); - Task.addTask(conn.unwrap(PhoenixConnection.class ), taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(), - taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(), - jsonNode.toString(), taskRecord.getPriority(), - taskRecord.getTimeStamp(), null, true); + Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(conn.unwrap(PhoenixConnection.class)) + .setTaskType(taskRecord.getTaskType()) + .setTenantId(taskRecord.getTenantId()) + .setSchemaName(taskRecord.getSchemaName()) + .setTableName(taskRecord.getTableName()) + .setTaskStatus(PTable.TaskStatus.STARTED.toString()) + .setData(jsonNode.toString()) + .setPriority(taskRecord.getPriority()) + .setStartTs(taskRecord.getTimeStamp()) + .setEndTs(null) + .setAccessCheckEnabled(true) + .build()); // It will take some time to finish, so we will check the status in a separate task. return null; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 9f4c139..788dea5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -433,6 +433,8 @@ public enum SQLExceptionCode { PTable.LinkType.CHILD_TABLE + ") for view"), TABLE_NOT_IN_REGION(1145, "XCL45", "No modifications allowed on this table. " + "Table not in this region."), + UNABLE_TO_UPSERT_TASK(1146, "XCL46", + "Error upserting records in SYSTEM.TASK table"), /** * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT). */ diff --git a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java index 45f43e1..66528dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java @@ -34,6 +34,8 @@ import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateVi import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos; +import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos + .TaskMutateRequest; import org.apache.phoenix.schema.PTableType; import com.google.protobuf.ByteString; @@ -108,6 +110,11 @@ public class ProtobufUtil { return getMutations(request.getTableMetadataMutationsList()); } + public static List<Mutation> getMutations(TaskMutateRequest request) + throws IOException { + return getMutations(request.getTableMetadataMutationsList()); + } + public static List<Mutation> getMutations(CreateViewAddChildLinkRequest request) throws IOException { return getMutations(request.getTableMetadataMutationsList()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index d76c647..7d93abd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -168,10 +168,10 @@ import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.coprocessor.SequenceRegionObserver; import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl; import org.apache.phoenix.coprocessor.PhoenixTTLRegionObserver; +import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint; import org.apache.phoenix.coprocessor.TaskRegionObserver; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.ChildLinkMetaDataService; -import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateViewAddChildLinkRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest; @@ -1095,6 +1095,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if(!descriptor.hasCoprocessor(TaskRegionObserver.class.getName())) { descriptor.addCoprocessor(TaskRegionObserver.class.getName(), null, priority, null); } + if (!descriptor.hasCoprocessor( + TaskMetaDataEndpoint.class.getName())) { + descriptor.addCoprocessor(TaskMetaDataEndpoint.class.getName(), + null, priority, null); + } } else if (SchemaUtil.isChildLinkTable(tableName)) { if (!descriptor.hasCoprocessor(ChildLinkMetaDataEndpoint.class.getName())) { descriptor.addCoprocessor(ChildLinkMetaDataEndpoint.class.getName(), null, priority, null); @@ -4146,7 +4151,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement TableName tableName = SchemaUtil.getPhysicalTableName( PhoenixDatabaseMetaData.SYSTEM_TASK_NAME, props); td = admin.getTableDescriptor(tableName); + boolean isTableDescUpdated = false; if (updateAndConfirmSplitPolicyForTask(td)) { + isTableDescUpdated = true; + } + if (!td.hasCoprocessor(TaskMetaDataEndpoint.class.getName())) { + int priority = props.getInt( + QueryServices.COPROCESSOR_PRIORITY_ATTRIB, + QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); + td.addCoprocessor( + TaskMetaDataEndpoint.class.getName(), null, priority, + null); + isTableDescUpdated = true; + } + if (isTableDescUpdated) { admin.modifyTable(tableName, td); pollForUpdatedTableDescriptor(admin, td, tableName.getName()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 4e20997..68b0891 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -79,6 +79,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYNC_INDEX_CREATED import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; @@ -100,6 +101,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_PHOENIX_TTL_HW import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT; import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE; +import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME; import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC; @@ -170,6 +172,8 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.coprocessor.MetaDataProtocol.SharedTableState; import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.task.SystemTaskParams; +import org.apache.phoenix.util.TaskMetaDataServiceCallBack; import org.apache.phoenix.util.ViewUtil; import org.apache.phoenix.util.JacksonUtil; import org.apache.phoenix.exception.SQLExceptionCode; @@ -4690,11 +4694,32 @@ public class MetaDataClient { }}; try { String json = JacksonUtil.getObjectWriter().writeValueAsString(props); - Task.addTask(connection, PTable.TaskType.INDEX_REBUILD, - tenantId, schemaName, - dataTableName, PTable.TaskStatus.CREATED.toString(), - json, null, ts, null, true); - connection.commit(); + List<Mutation> sysTaskUpsertMutations = Task.getMutationsForAddTask(new SystemTaskParams.SystemTaskParamsBuilder() + .setConn(connection) + .setTaskType( + PTable.TaskType.INDEX_REBUILD) + .setTenantId(tenantId) + .setSchemaName(schemaName) + .setTableName(dataTableName) + .setTaskStatus( + PTable.TaskStatus.CREATED.toString()) + .setData(json) + .setPriority(null) + .setStartTs(ts) + .setEndTs(null) + .setAccessCheckEnabled(true) + .build()); + byte[] rowKey = sysTaskUpsertMutations + .get(0).getRow(); + MetaDataMutationResult metaDataMutationResult = + Task.taskMetaDataCoprocessorExec(connection, rowKey, + new TaskMetaDataServiceCallBack(sysTaskUpsertMutations)); + if (MutationCode.UNABLE_TO_UPSERT_TASK.equals( + metaDataMutationResult.getMutationCode())) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK) + .setSchemaName(SYSTEM_SCHEMA_NAME) + .setTableName(SYSTEM_TASK_TABLE).build().buildException(); + } } catch (IOException e) { throw new SQLException("Exception happened while adding a System.Task" + e.toString()); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java new file mode 100644 index 0000000..1740256 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.schema.task; + +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PTable; + +import java.sql.Timestamp; + +/** + * Task params to be used while upserting records in SYSTEM.TASK table. + * This POJO is mainly used while upserting(and committing) or generating + * upsert mutations plan in {@link Task} class + */ +@edu.umd.cs.findbugs.annotations.SuppressWarnings( + value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}, + justification = "endTs and startTs are not used for mutation") +public class SystemTaskParams { + + private final PhoenixConnection conn; + private final PTable.TaskType taskType; + private final String tenantId; + private final String schemaName; + private final String tableName; + private final String taskStatus; + private final String data; + private final Integer priority; + private final Timestamp startTs; + private final Timestamp endTs; + private final boolean accessCheckEnabled; + + public SystemTaskParams(PhoenixConnection conn, PTable.TaskType taskType, + String tenantId, String schemaName, String tableName, + String taskStatus, String data, Integer priority, Timestamp startTs, + Timestamp endTs, boolean accessCheckEnabled) { + this.conn = conn; + this.taskType = taskType; + this.tenantId = tenantId; + this.schemaName = schemaName; + this.tableName = tableName; + this.taskStatus = taskStatus; + this.data = data; + this.priority = priority; + this.startTs = startTs; + this.endTs = endTs; + this.accessCheckEnabled = accessCheckEnabled; + } + + public PhoenixConnection getConn() { + return conn; + } + + public PTable.TaskType getTaskType() { + return taskType; + } + + public String getTenantId() { + return tenantId; + } + + public String getSchemaName() { + return schemaName; + } + + public String getTableName() { + return tableName; + } + + public String getTaskStatus() { + return taskStatus; + } + + public String getData() { + return data; + } + + public Integer getPriority() { + return priority; + } + + public Timestamp getStartTs() { + return startTs; + } + + public Timestamp getEndTs() { + return endTs; + } + + public boolean isAccessCheckEnabled() { + return accessCheckEnabled; + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}, + justification = "endTs and startTs are not used for mutation") + public static class SystemTaskParamsBuilder { + + private PhoenixConnection conn; + private PTable.TaskType taskType; + private String tenantId; + private String schemaName; + private String tableName; + private String taskStatus; + private String data; + private Integer priority; + private Timestamp startTs; + private Timestamp endTs; + private boolean accessCheckEnabled; + + public SystemTaskParamsBuilder setConn(PhoenixConnection conn) { + this.conn = conn; + return this; + } + + public SystemTaskParamsBuilder setTaskType(PTable.TaskType taskType) { + this.taskType = taskType; + return this; + } + + public SystemTaskParamsBuilder setTenantId(String tenantId) { + this.tenantId = tenantId; + return this; + } + + public SystemTaskParamsBuilder setSchemaName(String schemaName) { + this.schemaName = schemaName; + return this; + } + + public SystemTaskParamsBuilder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public SystemTaskParamsBuilder setTaskStatus(String taskStatus) { + this.taskStatus = taskStatus; + return this; + } + + public SystemTaskParamsBuilder setData(String data) { + this.data = data; + return this; + } + + public SystemTaskParamsBuilder setPriority(Integer priority) { + this.priority = priority; + return this; + } + + public SystemTaskParamsBuilder setStartTs(Timestamp startTs) { + this.startTs = startTs; + return this; + } + + public SystemTaskParamsBuilder setEndTs(Timestamp endTs) { + this.endTs = endTs; + return this; + } + + public SystemTaskParamsBuilder setAccessCheckEnabled( + boolean accessCheckEnabled) { + this.accessCheckEnabled = accessCheckEnabled; + return this; + } + + public SystemTaskParams build() { + return new SystemTaskParams(conn, taskType, tenantId, schemaName, + tableName, taskStatus, data, priority, startTs, endTs, + accessCheckEnabled); + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java index 0fe256b..e146ce0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java @@ -18,14 +18,27 @@ package org.apache.phoenix.schema.task; import com.google.common.base.Strings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; +import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos + .TaskMetaDataService; import org.apache.phoenix.coprocessor.tasks.DropChildViewsTask; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.ServerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +51,9 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; public class Task { public static final Logger LOGGER = LoggerFactory.getLogger(Task.class); @@ -75,6 +90,46 @@ public class Task { } } + private static List<Mutation> getMutationsForSystemTaskTable( + final PhoenixConnection conn, final PreparedStatement stmt, + final boolean accessCheckEnabled) throws IOException { + // we need to mutate SYSTEM.TASK with HBase/login user if access is enabled. + if (accessCheckEnabled) { + return User.runAsLoginUser(new PrivilegedExceptionAction<List<Mutation>>() { + @Override + public List<Mutation> run() throws Exception { + final RpcServer.Call rpcContext = RpcUtil.getRpcContext(); + // setting RPC context as null so that user can be reset + try { + RpcUtil.setRpcContext(null); + return executeStatementAndGetTaskMutations(conn, stmt); + } catch (SQLException e) { + throw new IOException(e); + } finally { + // setting RPC context back to original context of the RPC + RpcUtil.setRpcContext(rpcContext); + } + } + }); + } else { + try { + return executeStatementAndGetTaskMutations(conn, stmt); + } catch (SQLException e) { + throw new IOException(e); + } + } + } + + private static List<Mutation> executeStatementAndGetTaskMutations( + PhoenixConnection conn, PreparedStatement stmt) + throws SQLException { + stmt.execute(); + // retrieve mutations for SYSTEM.TASK upsert query + Iterator<Pair<byte[], List<Mutation>>> iterator = + conn.getMutationState().toMutations(); + return iterator.next().getSecond(); + } + private static PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType, String tenantId, String schemaName, String tableName, String taskStatus, String data, Integer priority, Timestamp startTs, Timestamp endTs) throws SQLException { @@ -123,13 +178,28 @@ public class Task { return stmt; } - public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName, - String tableName, String taskStatus, String data, Integer priority, Timestamp startTs, Timestamp endTs, - boolean accessCheckEnabled) - throws IOException { + /** + * Execute and commit upsert query on SYSTEM.TASK + * This method should be used only from server side. Client should use + * {@link #getMutationsForAddTask(SystemTaskParams)} instead of direct + * upsert commit. + * + * @param systemTaskParams Task params with various task related arguments + * @throws IOException If something goes wrong while preparing mutations + * or committing transactions + */ + public static void addTask(SystemTaskParams systemTaskParams) + throws IOException { + addTaskAndGetStatement(systemTaskParams, systemTaskParams.getConn(), + true); + } + + private static PreparedStatement addTaskAndGetStatement( + SystemTaskParams systemTaskParams, PhoenixConnection connection, + boolean shouldCommit) throws IOException { PreparedStatement stmt; try { - stmt = conn.prepareStatement("UPSERT INTO " + + stmt = connection.prepareStatement("UPSERT INTO " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " + PhoenixDatabaseMetaData.TASK_TYPE + ", " + PhoenixDatabaseMetaData.TENANT_ID + ", " + @@ -141,12 +211,78 @@ public class Task { PhoenixDatabaseMetaData.TASK_END_TS + ", " + PhoenixDatabaseMetaData.TASK_DATA + " ) VALUES(?,?,?,?,?,?,?,?,?)"); - stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName, taskStatus, data, priority, startTs, endTs); - LOGGER.info("Adding task " + taskType + "," +tableName + "," + taskStatus + "," + startTs, ","+endTs); + stmt = setValuesToAddTaskPS(stmt, systemTaskParams.getTaskType(), + systemTaskParams.getTenantId(), + systemTaskParams.getSchemaName(), + systemTaskParams.getTableName(), + systemTaskParams.getTaskStatus(), systemTaskParams.getData(), + systemTaskParams.getPriority(), systemTaskParams.getStartTs(), + systemTaskParams.getEndTs()); + LOGGER.info("Adding task type: " + + systemTaskParams.getTaskType() + " , tableName: " + + systemTaskParams.getTableName() + " , taskStatus: " + + systemTaskParams.getTaskStatus() + " , startTs: " + + systemTaskParams.getStartTs() + " , endTs: " + + systemTaskParams.getEndTs()); } catch (SQLException e) { throw new IOException(e); } - mutateSystemTaskTable(conn, stmt, accessCheckEnabled); + // if query is getting executed by client, do not execute and commit + // mutations + if (shouldCommit) { + mutateSystemTaskTable(connection, stmt, + systemTaskParams.isAccessCheckEnabled()); + } + return stmt; + } + + public static List<Mutation> getMutationsForAddTask( + SystemTaskParams systemTaskParams) + throws IOException, SQLException { + PhoenixConnection curConn = systemTaskParams.getConn(); + Configuration conf = curConn.getQueryServices().getConfiguration(); + // create new connection as we do not want to mix up mutationState + // with existing connection + try (PhoenixConnection newConnection = + QueryUtil.getConnectionOnServer(curConn.getClientInfo(), conf) + .unwrap(PhoenixConnection.class)) { + PreparedStatement statement = addTaskAndGetStatement( + systemTaskParams, newConnection, false); + return getMutationsForSystemTaskTable(newConnection, + statement, systemTaskParams.isAccessCheckEnabled()); + } + } + + /** + * Invoke SYSTEM.TASK metadata coprocessor endpoint + * + * @param connection Phoenix Connection + * @param rowKey key corresponding to SYSTEM.TASK mutation + * @param callable used to invoke the coprocessor endpoint to upsert + * records in SYSTEM.TASK + * @return result of invoking the coprocessor endpoint + * @throws SQLException If something goes wrong while executing co + */ + public static MetaDataMutationResult taskMetaDataCoprocessorExec( + final PhoenixConnection connection, final byte[] rowKey, + final Batch.Call<TaskMetaDataService, MetaDataResponse> callable) + throws SQLException { + TableName tableName = SchemaUtil.getPhysicalName( + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES, + connection.getQueryServices().getProps()); + try (Table table = + connection.getQueryServices().getTable(tableName.getName())) { + final Map<byte[], MetaDataResponse> results = + table.coprocessorService(TaskMetaDataService.class, rowKey, + rowKey, callable); + assert results.size() == 1; + MetaDataResponse result = results.values().iterator().next(); + return MetaDataMutationResult.constructFromProto(result); + } catch (IOException e) { + throw ServerUtil.parseServerException(e); + } catch (Throwable t) { + throw new SQLException(t); + } } public static void deleteTask(PhoenixConnection conn, PTable.TaskType taskType, Timestamp ts, String tenantId, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java new file mode 100644 index 0000000..b7630a8 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.util; + +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; +import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos; +import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos + .TaskMetaDataService; +import org.apache.phoenix.protobuf.ProtobufUtil; + +import java.io.IOException; +import java.util.List; + +/** + * Callable implementation for coprocessor endpoint associated with + * SYSTEM.TASK + */ +public class TaskMetaDataServiceCallBack + implements Batch.Call<TaskMetaDataService, MetaDataResponse> { + + private final List<Mutation> taskMutations; + + public TaskMetaDataServiceCallBack(List<Mutation> taskMutations) { + this.taskMutations = taskMutations; + } + + @Override + public MetaDataResponse call(TaskMetaDataService instance) + throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<>(); + TaskMetaDataProtos.TaskMutateRequest.Builder builder = + TaskMetaDataProtos.TaskMutateRequest.newBuilder(); + for (Mutation mutation : taskMutations) { + ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation); + builder.addTableMetadataMutations(mp.toByteString()); + } + TaskMetaDataProtos.TaskMutateRequest build = builder.build(); + instance.upsertTaskDetails(controller, build, rpcCallback); + if (controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java new file mode 100644 index 0000000..ae19bc6 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.coprocessor; + + +import com.google.protobuf.RpcController; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos; +import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos; +import org.apache.phoenix.protobuf.ProtobufUtil; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; + +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for TaskMetaDataEndpoint + */ +public class TaskMetaDataEndpointTest { + + private TaskMetaDataEndpoint taskMetaDataEndpoint; + private Configuration configuration; + + @Mock + private Region region; + + @Mock + private HRegionInfo regionInfo; + + @Mock + private RpcController controller; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + configuration = new Configuration(); + RegionCoprocessorEnvironment environment = + new RegionCoprocessorEnvironment() { + + @Override + public int getVersion() { + return 0; + } + + @Override + public String getHBaseVersion() { + return null; + } + + @Override + public Coprocessor getInstance() { + return null; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public int getLoadSequence() { + return 0; + } + + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public HTableInterface getTable( + TableName tableName) throws IOException { + return null; + } + + @Override + public HTableInterface getTable(TableName tableName, + ExecutorService service) throws IOException { + return null; + } + + @Override + public ClassLoader getClassLoader() { + return null; + } + + @Override + public Region getRegion() { + return region; + } + + @Override + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + @Override + public RegionServerServices getRegionServerServices() { + return null; + } + + @Override + public ConcurrentMap<String, Object> getSharedData() { + return null; + } + }; + taskMetaDataEndpoint = new TaskMetaDataEndpoint(); + taskMetaDataEndpoint.start(environment); + } + + @Test + public void testUpsertTaskDetails() throws Exception { + Mutation mutation = new Put(Bytes.toBytes("row1")); + TaskMetaDataProtos.TaskMutateRequest.Builder builder = + TaskMetaDataProtos.TaskMutateRequest.newBuilder(); + ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation); + builder.addTableMetadataMutations(mp.toByteString()); + TaskMetaDataProtos.TaskMutateRequest request = builder.build(); + BlockingRpcCallback<MetaDataProtos.MetaDataResponse> rpcCallback = + new BlockingRpcCallback<>(); + Mockito.doNothing().when(region).mutateRowsWithLocks( + Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(), + Mockito.anyLong(), Mockito.anyLong()); + taskMetaDataEndpoint.upsertTaskDetails(controller, request, rpcCallback); + Mockito.verify(region, Mockito.times(1)).mutateRowsWithLocks( + Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(), + Mockito.anyLong(), Mockito.anyLong()); + } + + @Test + public void testUpsertTaskDetailsFailure() throws Exception { + Mutation mutation = new Put(Bytes.toBytes("row2")); + TaskMetaDataProtos.TaskMutateRequest.Builder builder = + TaskMetaDataProtos.TaskMutateRequest.newBuilder(); + ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation); + builder.addTableMetadataMutations(mp.toByteString()); + TaskMetaDataProtos.TaskMutateRequest request = builder.build(); + BlockingRpcCallback<MetaDataProtos.MetaDataResponse> rpcCallback = + new BlockingRpcCallback<>(); + Mockito.doThrow(new IOException()).when(region).mutateRowsWithLocks( + Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(), + Mockito.anyLong(), Mockito.anyLong()); + taskMetaDataEndpoint.upsertTaskDetails(controller, request, rpcCallback); + Mockito.verify(region, Mockito.times(1)).mutateRowsWithLocks( + Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(), + Mockito.anyLong(), Mockito.anyLong()); + assertEquals(MetaDataProtos.MutationCode.UNABLE_TO_UPSERT_TASK, + rpcCallback.get().getReturnCode()); + } + +} \ No newline at end of file diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto index a30fad7..cbc53d6 100644 --- a/phoenix-protocol/src/main/MetaDataService.proto +++ b/phoenix-protocol/src/main/MetaDataService.proto @@ -53,6 +53,7 @@ enum MutationCode { UNABLE_TO_CREATE_CHILD_LINK = 23; UNABLE_TO_UPDATE_PARENT_TABLE = 24; UNABLE_TO_DELETE_CHILD_LINK = 25; + UNABLE_TO_UPSERT_TASK = 26; }; message SharedTableState { diff --git a/phoenix-protocol/src/main/TaskMetaDataService.proto b/phoenix-protocol/src/main/TaskMetaDataService.proto new file mode 100644 index 0000000..d6868bf --- /dev/null +++ b/phoenix-protocol/src/main/TaskMetaDataService.proto @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.phoenix.coprocessor.generated"; +option java_outer_classname = "TaskMetaDataProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "MetaDataService.proto"; + +message TaskMutateRequest { + repeated bytes tableMetadataMutations = 1; +} + +service TaskMetaDataService { + rpc upsertTaskDetails(TaskMutateRequest) + returns (MetaDataResponse); +}