http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java new file mode 100644 index 0000000..86b8bf1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; + +public interface MetaDataEndpointObserver extends Coprocessor { + + void preGetTable( ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,String tableName, + TableName physicalTableName) throws IOException; + + void preCreateTable(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId, + String tableName, TableName physicalTableName, final TableName parentPhysicalTableName, + PTableType tableType, final Set<byte[]> familySet, Set<TableName> indexes) throws IOException; + + void preDropTable(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId, + final String tableName,TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType, List<PTable> indexes) throws IOException; + + void preAlterTable(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId,final String tableName, + final TableName physicalTableName,final TableName parentPhysicalTableName, PTableType type) throws IOException; + + void preGetSchema(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String schemaName) + throws IOException; + + void preCreateSchema(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String schemaName) + throws IOException; + + void preDropSchema(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String schemaName) + throws IOException; + + void preCreateFunction(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId, + final String functionName) throws IOException; + + void preDropFunction(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId, + final String functionName) throws IOException; + + void preGetFunctions(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId, + final String functionName) throws IOException; + + void preIndexUpdate(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String indexName, TableName physicalTableName, TableName parentPhysicalTableName, PIndexState newState) throws IOException; + +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index c816549..af06235 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -20,6 +20,7 @@ package org.apache.phoenix.coprocessor; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -163,9 +165,18 @@ public class MetaDataRegionObserver extends BaseRegionObserver { SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props)); statsTable = env.getTable( SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, props)); - if (UpgradeUtil.truncateStats(metaTable, statsTable)) { - LOG.info("Stats are successfully truncated for upgrade 4.7!!"); - } + final HTableInterface mTable=metaTable; + final HTableInterface sTable=statsTable; + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + if (UpgradeUtil.truncateStats(mTable, sTable)) { + LOG.info("Stats are successfully truncated for upgrade 4.7!!"); + } + return null; + } + }); + } catch (Exception exception) { LOG.warn("Exception while truncate stats..," + " please check and delete stats manually inorder to get proper result with old client!!"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java new file mode 100644 index 0000000..a4bc857 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.net.InetAddress; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.AccessControlClient; +import org.apache.hadoop.hbase.security.access.AuthResult; +import org.apache.hadoop.hbase.security.access.Permission; +import org.apache.hadoop.hbase.security.access.Permission.Action; +import org.apache.hadoop.hbase.security.access.UserPermission; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.MetaDataUtil; + +import com.google.common.collect.Lists; +import com.google.protobuf.RpcCallback; + +public class PhoenixAccessController extends BaseMetaDataEndpointObserver { + + private PhoenixMetaDataControllerEnvironment env; + private ArrayList<BaseMasterAndRegionObserver> accessControllers; + private boolean accessCheckEnabled; + private UserProvider userProvider; + public static final Log LOG = LogFactory.getLog(PhoenixAccessController.class); + private static final Log AUDITLOG = + LogFactory.getLog("SecurityLogger."+PhoenixAccessController.class.getName()); + + private List<BaseMasterAndRegionObserver> getAccessControllers() throws IOException { + if (accessControllers == null) { + synchronized (this) { + if (accessControllers == null) { + accessControllers = new ArrayList<BaseMasterAndRegionObserver>(); + RegionCoprocessorHost cpHost = this.env.getCoprocessorHost(); + List<BaseMasterAndRegionObserver> coprocessors = cpHost + .findCoprocessors(BaseMasterAndRegionObserver.class); + for (BaseMasterAndRegionObserver cp : coprocessors) { + if (cp instanceof AccessControlService.Interface) { + accessControllers.add(cp); + } + } + } + } + } + return accessControllers; + } + + @Override + public void preGetTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName) throws IOException { + for (BaseMasterAndRegionObserver observer : getAccessControllers()) { + observer.preGetTableDescriptors(new ObserverContext<MasterCoprocessorEnvironment>(), + Lists.newArrayList(physicalTableName), Collections.<HTableDescriptor> emptyList()); + } + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + Configuration conf = env.getConfiguration(); + this.accessCheckEnabled = conf.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED, + QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED); + if (!this.accessCheckEnabled) { + LOG.warn("PhoenixAccessController has been loaded with authorization checks disabled."); + } + if (env instanceof PhoenixMetaDataControllerEnvironment) { + this.env = (PhoenixMetaDataControllerEnvironment)env; + } else { + throw new IllegalArgumentException( + "Not a valid environment, should be loaded by PhoenixMetaDataControllerEnvironment"); + } + // set the user-provider. + this.userProvider = UserProvider.instantiate(env.getConfiguration()); + // init superusers and add the server principal (if using security) + // or process owner as default super user. + Superusers.initialize(env.getConfiguration()); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException {} + + @Override + public void preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType, + Set<byte[]> familySet, Set<TableName> indexes) throws IOException { + if (!accessCheckEnabled) { return; } + + if (tableType != PTableType.VIEW) { + final HTableDescriptor htd = new HTableDescriptor(physicalTableName); + for (byte[] familyName : familySet) { + htd.addFamily(new HColumnDescriptor(familyName)); + } + for (BaseMasterAndRegionObserver observer : getAccessControllers()) { + observer.preCreateTable(new ObserverContext<MasterCoprocessorEnvironment>(), htd, null); + } + } + + // Index and view require read access on parent physical table. + Set<TableName> physicalTablesChecked = new HashSet<TableName>(); + if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) { + physicalTablesChecked.add(parentPhysicalTableName); + requireAccess("Create" + tableType, parentPhysicalTableName, Action.READ, Action.EXEC); + } + + if (tableType == PTableType.VIEW) { + + Action[] requiredActions = { Action.READ, Action.EXEC }; + for (TableName index : indexes) { + if (!physicalTablesChecked.add(index)) { + // skip check for local index as we have already check the ACLs above + // And for same physical table multiple times like view index table + continue; + } + + User user = getActiveUser(); + List<UserPermission> permissionForUser = getPermissionForUser( + getUserPermissions(index.getNameAsString()), Bytes.toBytes(user.getShortName())); + Set<Action> requireAccess = new HashSet<>(); + Set<Action> accessExists = new HashSet<>(); + if (permissionForUser != null) { + for (UserPermission userPermission : permissionForUser) { + for (Action action : Arrays.asList(requiredActions)) { + if (!userPermission.implies(action)) { + requireAccess.add(action); + } + } + } + if (!requireAccess.isEmpty()) { + for (UserPermission userPermission : permissionForUser) { + accessExists.addAll(Arrays.asList(userPermission.getActions())); + } + + } + } else { + requireAccess.addAll(Arrays.asList(requiredActions)); + } + if (!requireAccess.isEmpty()) { + byte[] indexPhysicalTable = index.getName(); + handleRequireAccessOnDependentTable("Create" + tableType, user.getName(), + TableName.valueOf(indexPhysicalTable), tableName, requireAccess, accessExists); + } + } + + } + + if (tableType == PTableType.INDEX) { + // All the users who have READ access on data table should have access to Index table as well. + // WRITE is needed for the index updates done by the user who has WRITE access on data table. + // CREATE is needed during the drop of the table. + // We are doing this because existing user while querying data table should not see access denied for the + // new indexes. + // TODO: confirm whether granting permission from coprocessor is a security leak.(currently it is done if + // automatic grant is enabled explicitly by user in configuration + // skip check for local index + if (physicalTableName != null && !parentPhysicalTableName.equals(physicalTableName) + && !MetaDataUtil.isViewIndex(physicalTableName.getNameAsString())) { + authorizeOrGrantAccessToUsers("Create" + tableType, parentPhysicalTableName, + Arrays.asList(Action.READ, Action.WRITE, Action.CREATE, Action.EXEC, Action.ADMIN), + physicalTableName); + } + } + } + + + public void handleRequireAccessOnDependentTable(String request, String userName, TableName dependentTable, + String requestTable, Set<Action> requireAccess, Set<Action> accessExists) throws IOException { + + Set<Action> unionSet = new HashSet<Action>(); + unionSet.addAll(requireAccess); + unionSet.addAll(accessExists); + AUDITLOG.info(request + ": Automatically granting access to index table during creation of view:" + + requestTable + authString(userName, dependentTable, requireAccess)); + grantPermissions(userName, dependentTable.getName(), unionSet.toArray(new Action[0])); + } + + private void grantPermissions(final String toUser, final byte[] table, final Action... actions) throws IOException { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + try (Connection conn = ConnectionFactory.createConnection(env.getConfiguration())) { + AccessControlClient.grant(conn, TableName.valueOf(table), toUser , null, null, + actions); + } catch (Throwable e) { + new DoNotRetryIOException(e); + } + return null; + } + }); + } + + private void authorizeOrGrantAccessToUsers(final String request, final TableName fromTable, + final List<Action> requiredActionsOnTable, final TableName toTable) + throws IOException { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws IOException { + try (Connection conn = ConnectionFactory.createConnection(env.getConfiguration())) { + List<UserPermission> userPermissions = getUserPermissions(fromTable.getNameAsString()); + List<UserPermission> permissionsOnTheTable = getUserPermissions(toTable.getNameAsString()); + if (userPermissions != null) { + for (UserPermission userPermission : userPermissions) { + Set<Action> requireAccess = new HashSet<Action>(); + Set<Action> accessExists = new HashSet<Action>(); + List<UserPermission> permsToTable = getPermissionForUser(permissionsOnTheTable, + userPermission.getUser()); + for (Action action : requiredActionsOnTable) { + boolean haveAccess=false; + if (userPermission.implies(action)) { + if (permsToTable == null) { + requireAccess.add(action); + } else { + for (UserPermission permToTable : permsToTable) { + if (permToTable.implies(action)) { + haveAccess=true; + } + } + if (!haveAccess) { + requireAccess.add(action); + } + } + } + } + if (permsToTable != null) { + // Append access to already existing access for the user + for (UserPermission permToTable : permsToTable) { + accessExists.addAll(Arrays.asList(permToTable.getActions())); + } + } + if (!requireAccess.isEmpty()) { + if(AuthUtil.isGroupPrincipal(Bytes.toString(userPermission.getUser()))){ + AUDITLOG.warn("Users of GROUP:" + Bytes.toString(userPermission.getUser()) + + " will not have following access " + requireAccess + + " to the newly created index " + toTable + + ", Automatic grant is not yet allowed on Groups"); + continue; + } + handleRequireAccessOnDependentTable(request, Bytes.toString(userPermission.getUser()), + toTable, toTable.getNameAsString(), requireAccess, accessExists); + } + } + } + } + return null; + } + }); + } + + private List<UserPermission> getPermissionForUser(List<UserPermission> perms, byte[] user) { + if (perms != null) { + // get list of permissions for the user as multiple implementation of AccessControl coprocessors can give + // permissions for same users + List<UserPermission> permissions = new ArrayList<>(); + for (UserPermission p : perms) { + if (Bytes.equals(p.getUser(),user)){ + permissions.add(p); + } + } + if (!permissions.isEmpty()){ + return permissions; + } + } + return null; + } + + @Override + public void preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType, + List<PTable> indexes) throws IOException { + if (!accessCheckEnabled) { return; } + + for (BaseMasterAndRegionObserver observer : getAccessControllers()) { + if (tableType != PTableType.VIEW) { + observer.preDeleteTable(new ObserverContext<MasterCoprocessorEnvironment>(), physicalTableName); + } + if (indexes != null) { + for (PTable index : indexes) { + observer.preDeleteTable(new ObserverContext<MasterCoprocessorEnvironment>(), + TableName.valueOf(index.getPhysicalName().getBytes())); + } + } + } + //checking similar permission checked during the create of the view. + if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) { + requireAccess("Drop "+tableType, parentPhysicalTableName, Action.READ, Action.EXEC); + } + } + + @Override + public void preAlterTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType) throws IOException { + if (!accessCheckEnabled) { return; } + for (BaseMasterAndRegionObserver observer : getAccessControllers()) { + if (tableType != PTableType.VIEW) { + observer.preModifyTable(new ObserverContext<MasterCoprocessorEnvironment>(), physicalTableName, + new HTableDescriptor(physicalTableName)); + } + } + if (tableType == PTableType.VIEW) { + requireAccess("Alter "+tableType, parentPhysicalTableName, Action.READ, Action.EXEC); + } + } + + @Override + public void preGetSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName) + throws IOException { + if (!accessCheckEnabled) { return; } + for (BaseMasterAndRegionObserver observer : getAccessControllers()) { + observer.preListNamespaceDescriptors(new ObserverContext<MasterCoprocessorEnvironment>(), + Arrays.asList(NamespaceDescriptor.create(schemaName).build())); + } + } + + @Override + public void preCreateSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName) + throws IOException { + if (!accessCheckEnabled) { return; } + for (BaseMasterAndRegionObserver observer : getAccessControllers()) { + observer.preCreateNamespace(new ObserverContext<MasterCoprocessorEnvironment>(), + NamespaceDescriptor.create(schemaName).build()); + } + } + + @Override + public void preDropSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName) + throws IOException { + if (!accessCheckEnabled) { return; } + for (BaseMasterAndRegionObserver observer : getAccessControllers()) { + observer.preDeleteNamespace(new ObserverContext<MasterCoprocessorEnvironment>(), schemaName); + } + } + + @Override + public void preIndexUpdate(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String indexName, TableName physicalTableName, TableName parentPhysicalTableName, PIndexState newState) + throws IOException { + if (!accessCheckEnabled) { return; } + for (BaseMasterAndRegionObserver observer : getAccessControllers()) { + observer.preModifyTable(new ObserverContext<MasterCoprocessorEnvironment>(), physicalTableName, + new HTableDescriptor(physicalTableName)); + } + // Check for read access in case of rebuild + if (newState == PIndexState.BUILDING) { + requireAccess("Rebuild:", parentPhysicalTableName, Action.READ, Action.EXEC); + } + } + + private List<UserPermission> getUserPermissions(final String tableName) throws IOException { + return User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() { + @Override + public List<UserPermission> run() throws Exception { + final List<UserPermission> userPermissions = new ArrayList<UserPermission>(); + try (Connection connection = ConnectionFactory.createConnection(env.getConfiguration())) { + for (BaseMasterAndRegionObserver service : accessControllers) { + if (service.getClass().getName().equals(org.apache.hadoop.hbase.security.access.AccessController.class.getName())) { + userPermissions.addAll(AccessControlClient.getUserPermissions(connection, tableName)); + } else { + AccessControlProtos.GetUserPermissionsRequest.Builder builder = AccessControlProtos.GetUserPermissionsRequest + .newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(tableName))); + builder.setType(AccessControlProtos.Permission.Type.Table); + AccessControlProtos.GetUserPermissionsRequest request = builder.build(); + + PayloadCarryingRpcController controller = ((ClusterConnection)connection) + .getRpcControllerFactory().newController(); + ((AccessControlService.Interface)service).getUserPermissions(controller, request, + new RpcCallback<AccessControlProtos.GetUserPermissionsResponse>() { + @Override + public void run(AccessControlProtos.GetUserPermissionsResponse message) { + if (message != null) { + for (AccessControlProtos.UserPermission perm : message + .getUserPermissionList()) { + userPermissions.add(ProtobufUtil.toUserPermission(perm)); + } + } + } + }); + } + } + } catch (Throwable e) { + if (e instanceof Exception) { + throw (Exception) e; + } else if (e instanceof Error) { + throw (Error) e; + } + throw new Exception(e); + } + return userPermissions; + } + }); + } + + /** + * Authorizes that the current user has all the given permissions for the + * given table + * @param tableName Table requested + * @throws IOException if obtaining the current user fails + * @throws AccessDeniedException if user has no authorization + */ + private void requireAccess(String request, TableName tableName, Action... permissions) throws IOException { + User user = getActiveUser(); + AuthResult result = null; + List<Action> requiredAccess = new ArrayList<Action>(); + for (Action permission : permissions) { + if (hasAccess(getUserPermissions(tableName.getNameAsString()), tableName, permission, user)) { + result = AuthResult.allow(request, "Table permission granted", user, permission, tableName, null, null); + } else { + result = AuthResult.deny(request, "Insufficient permissions", user, permission, tableName, null, null); + requiredAccess.add(permission); + } + logResult(result); + } + if (!requiredAccess.isEmpty()) { + result = AuthResult.deny(request, "Insufficient permissions", user, requiredAccess.get(0), tableName, null, + null); + } + if (!result.isAllowed()) { throw new AccessDeniedException("Insufficient permissions " + + authString(user.getName(), tableName, new HashSet<Permission.Action>(Arrays.asList(permissions)))); } + } + + /** + * Checks if the user has access to the table for the specified action. + * + * @param perms All table permissions + * @param table tablename + * @param action action for access is required + * @return true if the user has access to the table for specified action, false otherwise + */ + private boolean hasAccess(List<UserPermission> perms, TableName table, Permission.Action action, User user) { + if (Superusers.isSuperUser(user)){ + return true; + } + if (perms != null) { + List<UserPermission> permissionsForUser = getPermissionForUser(perms, user.getShortName().getBytes()); + if (permissionsForUser != null) { + for (UserPermission permissionForUser : permissionsForUser) { + if (permissionForUser.implies(action)) { return true; } + } + } + String[] groupNames = user.getGroupNames(); + if (groupNames != null) { + for (String group : groupNames) { + List<UserPermission> groupPerms = getPermissionForUser(perms,(AuthUtil.toGroupEntry(group)).getBytes()); + if (groupPerms != null) for (UserPermission permissionForUser : groupPerms) { + if (permissionForUser.implies(action)) { return true; } + } + } + } + } else if (LOG.isDebugEnabled()) { + LOG.debug("No permissions found for table=" + table); + } + return false; + } + + private User getActiveUser() throws IOException { + User user = RpcServer.getRequestUser(); + if (user == null) { + // for non-rpc handling, fallback to system user + user = userProvider.getCurrent(); + } + return user; + } + + private void logResult(AuthResult result) { + if (AUDITLOG.isTraceEnabled()) { + InetAddress remoteAddr = RpcServer.getRemoteAddress(); + AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") + " for user " + + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") + "; reason: " + + result.getReason() + "; remote address: " + (remoteAddr != null ? remoteAddr : "") + "; request: " + + result.getRequest() + "; context: " + result.toContextString()); + } + } + + private static final class Superusers { + private static final Log LOG = LogFactory.getLog(Superusers.class); + + /** Configuration key for superusers */ + public static final String SUPERUSER_CONF_KEY = org.apache.hadoop.hbase.security.Superusers.SUPERUSER_CONF_KEY; // Not getting a name + + private static List<String> superUsers; + private static List<String> superGroups; + private static User systemUser; + + private Superusers(){} + + /** + * Should be called only once to pre-load list of super users and super + * groups from Configuration. This operation is idempotent. + * @param conf configuration to load users from + * @throws IOException if unable to initialize lists of superusers or super groups + * @throws IllegalStateException if current user is null + */ + public static void initialize(Configuration conf) throws IOException { + superUsers = new ArrayList<>(); + superGroups = new ArrayList<>(); + systemUser = User.getCurrent(); + + if (systemUser == null) { + throw new IllegalStateException("Unable to obtain the current user, " + + "authorization checks for internal operations will not work correctly!"); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Current user name is " + systemUser.getShortName()); + } + String currentUser = systemUser.getShortName(); + String[] superUserList = conf.getStrings(SUPERUSER_CONF_KEY, new String[0]); + for (String name : superUserList) { + if (AuthUtil.isGroupPrincipal(name)) { + superGroups.add(AuthUtil.getGroupName(name)); + } else { + superUsers.add(name); + } + } + superUsers.add(currentUser); + } + + /** + * @return true if current user is a super user (whether as user running process, + * declared as individual superuser or member of supergroup), false otherwise. + * @param user to check + * @throws IllegalStateException if lists of superusers/super groups + * haven't been initialized properly + */ + public static boolean isSuperUser(User user) { + if (superUsers == null) { + throw new IllegalStateException("Super users/super groups lists" + + " haven't been initialized properly."); + } + if (superUsers.contains(user.getShortName())) { + return true; + } + + for (String group : user.getGroupNames()) { + if (superGroups.contains(group)) { + return true; + } + } + return false; + } + + public static List<String> getSuperUsers() { + return superUsers; + } + + public static User getSystemUser() { + return systemUser; + } + } + + public String authString(String user, TableName table, Set<Action> actions) { + StringBuilder sb = new StringBuilder(); + sb.append(" (user=").append(user != null ? user : "UNKNOWN").append(", "); + sb.append("scope=").append(table == null ? "GLOBAL" : table.getNameWithNamespaceInclAsString()).append(", "); + sb.append(actions.size() > 1 ? "actions=" : "action=").append(actions != null ? actions.toString() : "") + .append(")"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java new file mode 100644 index 0000000..15b0020 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; + +public class PhoenixMetaDataCoprocessorHost + extends CoprocessorHost<PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment> { + private RegionCoprocessorEnvironment env; + public static final String PHOENIX_META_DATA_COPROCESSOR_CONF_KEY = + "hbase.coprocessor.phoenix.classes"; + public static final String DEFAULT_PHOENIX_META_DATA_COPROCESSOR_CONF_KEY="org.apache.phoenix.coprocessor.PhoenixAccessController"; + + public PhoenixMetaDataCoprocessorHost(RegionCoprocessorEnvironment env) { + super(null); + this.env = env; + this.conf = env.getConfiguration(); + boolean accessCheckEnabled = this.conf.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED, + QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED); + if (this.conf.get(PHOENIX_META_DATA_COPROCESSOR_CONF_KEY) == null && accessCheckEnabled) { + this.conf.set(PHOENIX_META_DATA_COPROCESSOR_CONF_KEY, DEFAULT_PHOENIX_META_DATA_COPROCESSOR_CONF_KEY); + } + loadSystemCoprocessors(conf, PHOENIX_META_DATA_COPROCESSOR_CONF_KEY); + } + + private static abstract class CoprocessorOperation<T extends CoprocessorEnvironment> extends ObserverContext<T> { + abstract void call(MetaDataEndpointObserver oserver, ObserverContext<T> ctx) throws IOException; + + public void postEnvCall(T env) {} + } + + private boolean execOperation( + final CoprocessorOperation<PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment> ctx) + throws IOException { + if (ctx == null) return false; + boolean bypass = false; + for (PhoenixMetaDataControllerEnvironment env : coprocessors) { + if (env.getInstance() instanceof MetaDataEndpointObserver) { + ctx.prepare(env); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + ctx.call((MetaDataEndpointObserver)env.getInstance(), ctx); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + ctx.postEnvCall(env); + } + return bypass; + } + + @Override + protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e) throws IOException { + if (e instanceof IOException) { + if (e.getCause() instanceof DoNotRetryIOException) { throw (IOException)e.getCause(); } + } + super.handleCoprocessorThrowable(env, e); + } + + /** + * Encapsulation of the environment of each coprocessor + */ + static class PhoenixMetaDataControllerEnvironment extends CoprocessorHost.Environment + implements RegionCoprocessorEnvironment { + + private RegionCoprocessorEnvironment env; + + public PhoenixMetaDataControllerEnvironment(RegionCoprocessorEnvironment env, Coprocessor instance, + int priority, int sequence, Configuration conf) { + super(instance, priority, sequence, conf); + this.env = env; + } + + @Override + public RegionServerServices getRegionServerServices() { + return env.getRegionServerServices(); + } + + public RegionCoprocessorHost getCoprocessorHost() { + return env.getRegion().getCoprocessorHost(); + } + + @Override + public Region getRegion() { + return env.getRegion(); + } + + @Override + public HRegionInfo getRegionInfo() { + return env.getRegionInfo(); + } + + @Override + public ConcurrentMap<String, Object> getSharedData() { + return env.getSharedData(); + } + } + + @Override + public PhoenixMetaDataControllerEnvironment createEnvironment(Class<?> implClass, Coprocessor instance, + int priority, int sequence, Configuration conf) { + return new PhoenixMetaDataControllerEnvironment(env, instance, priority, sequence, conf); + } + + public void preGetTable(final String tenantId, final String tableName, final TableName physicalTableName) + throws IOException { + execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException { + observer.preGetTable(ctx, tenantId, tableName, physicalTableName); + } + }); + } + + public void preCreateTable(final String tenantId, final String tableName, final TableName physicalTableName, + final TableName parentPhysicalTableName, final PTableType tableType, final Set<byte[]> familySet, final Set<TableName> indexes) + throws IOException { + execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException { + observer.preCreateTable(ctx, tenantId, tableName, physicalTableName, parentPhysicalTableName, tableType, + familySet, indexes); + } + }); + } + + public void preDropTable(final String tenantId, final String tableName, final TableName physicalTableName, + final TableName parentPhysicalTableName, final PTableType tableType, final List<PTable> indexes) throws IOException { + execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException { + observer.preDropTable(ctx, tenantId, tableName, physicalTableName, parentPhysicalTableName, tableType, indexes); + } + }); + } + + public void preAlterTable(final String tenantId, final String tableName, final TableName physicalTableName, + final TableName parentPhysicalTableName, final PTableType type) throws IOException { + execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException { + observer.preAlterTable(ctx, tenantId, tableName, physicalTableName, parentPhysicalTableName, type); + } + }); + } + + public void preGetSchema(final String schemaName) throws IOException { + execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException { + observer.preGetSchema(ctx, schemaName); + } + }); + } + + public void preCreateSchema(final String schemaName) throws IOException { + + execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException { + observer.preCreateSchema(ctx, schemaName); + } + }); + } + + public void preDropSchema(final String schemaName) throws IOException { + execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException { + observer.preDropSchema(ctx, schemaName); + } + }); + } + + public void preIndexUpdate(final String tenantId, final String indexName, final TableName physicalTableName, + final TableName parentPhysicalTableName, final PIndexState newState) throws IOException { + execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() { + @Override + public void call(MetaDataEndpointObserver observer, + ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException { + observer.preIndexUpdate(ctx, tenantId, indexName, physicalTableName, parentPhysicalTableName, newState); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index e51fd9f..2301c32 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -229,6 +229,7 @@ public enum SQLExceptionCode { return new TableAlreadyExistsException(info.getSchemaName(), info.getTableName()); } }), + TABLES_NOT_IN_SYNC(1140, "42M05", "Tables not in sync for some properties."), // Syntax error TYPE_NOT_SUPPORTED_FOR_OPERATOR(1014, "42Y01", "The operator does not support the operand type."), http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 4c29abe..369769e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -59,6 +59,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.SerialIterators; import org.apache.phoenix.iterate.SpoolingResultIterator; import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; @@ -67,6 +68,7 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.CostUtil; import org.apache.phoenix.util.ScanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +114,33 @@ public class AggregatePlan extends BaseQueryPlan { public Expression getHaving() { return having; } - + + @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + int parallelLevel = CostUtil.estimateParallelLevel( + true, context.getConnection().getQueryServices()); + Cost cost = CostUtil.estimateAggregateCost(byteCount, + groupBy, aggregators.getEstimatedByteSize(), parallelLevel); + if (!orderBy.getOrderByExpressions().isEmpty()) { + double outputBytes = CostUtil.estimateAggregateOutputBytes( + byteCount, groupBy, aggregators.getEstimatedByteSize()); + Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel); + cost = cost.plus(orderByCost); + } + return cost; + } + @Override public List<KeyRange> getSplits() { if (splits == null) http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index c1ddd44..31f67b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -63,6 +63,8 @@ import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; @@ -500,13 +502,24 @@ public abstract class BaseQueryPlan implements QueryPlan { if (context.getScanRanges() == ScanRanges.NOTHING) { return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString())); } - + + // If cost-based optimizer is enabled, we need to initialize a dummy iterator to + // get the stats for computing costs. + boolean costBased = + context.getConnection().getQueryServices().getConfiguration().getBoolean( + QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED); + if (costBased) { + ResultIterator iterator = iterator(); + iterator.close(); + } // Optimize here when getting explain plan, as queries don't get optimized until after compilation QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this); ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan(); - this.estimatedRows = plan.getEstimatedRowsToScan(); - this.estimatedSize = plan.getEstimatedBytesToScan(); - this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp(); + if (!costBased) { // do not override estimates if they are used for cost calculation. + this.estimatedRows = plan.getEstimatedRowsToScan(); + this.estimatedSize = plan.getEstimatedBytesToScan(); + this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp(); + } return exp; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java index 8ef1f8d..a15ab35 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java @@ -56,12 +56,14 @@ import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.CostUtil; import org.apache.phoenix.util.TupleUtil; import com.google.common.collect.Lists; @@ -87,6 +89,32 @@ public class ClientAggregatePlan extends ClientProcessingPlan { } @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + int parallelLevel = CostUtil.estimateParallelLevel( + false, context.getConnection().getQueryServices()); + Cost cost = CostUtil.estimateAggregateCost(byteCount, + groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel); + if (!orderBy.getOrderByExpressions().isEmpty()) { + double outputBytes = CostUtil.estimateAggregateOutputBytes( + byteCount, groupBy, clientAggregators.getEstimatedByteSize()); + Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel); + cost = cost.plus(orderByCost); + } + return super.getCost().plus(cost); + } + + @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { ResultIterator iterator = delegate.iterator(scanGrouper, scan); if (where != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java index 6bbc545..5799990 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java @@ -34,10 +34,12 @@ import org.apache.phoenix.iterate.OrderedResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.CostUtil; import com.google.common.collect.Lists; @@ -50,6 +52,29 @@ public class ClientScanPlan extends ClientProcessingPlan { } @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + Cost cost = new Cost(0, 0, byteCount); + int parallelLevel = CostUtil.estimateParallelLevel( + false, context.getConnection().getQueryServices()); + if (!orderBy.getOrderByExpressions().isEmpty()) { + Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel); + cost = cost.plus(orderByCost); + } + return super.getCost().plus(cost); + } + + @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { ResultIterator iterator = delegate.iterator(scanGrouper, scan); if (where != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java index ee81c36..270ad3d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java @@ -30,6 +30,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; @@ -200,4 +201,28 @@ public class CorrelatePlan extends DelegateQueryPlan { return null; } + @Override + public Cost getCost() { + Long lhsByteCount = null; + try { + lhsByteCount = delegate.getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + Long rhsRowCount = null; + try { + rhsRowCount = rhs.getEstimatedRowsToScan(); + } catch (SQLException e) { + // ignored. + } + + if (lhsByteCount == null || rhsRowCount == null) { + return Cost.UNKNOWN; + } + + Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount); + Cost lhsCost = delegate.getCost(); + return cost.plus(lhsCost).plus(rhs.getCost()); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java index 3c62c5b..3da06db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java @@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; @@ -59,6 +60,11 @@ public abstract class DelegateQueryPlan implements QueryPlan { } @Override + public Cost getCost() { + return delegate.getCost(); + } + + @Override public TableRef getTableRef() { return delegate.getTableRef(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 2b90dcb..2d2ff4e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -62,6 +62,7 @@ import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; @@ -290,6 +291,34 @@ public class HashJoinPlan extends DelegateQueryPlan { return statement; } + @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + Cost cost = new Cost(0, 0, byteCount); + Cost lhsCost = delegate.getCost(); + if (keyRangeExpressions != null) { + // The selectivity of the dynamic rowkey filter. + // TODO replace the constant with an estimate value. + double selectivity = 0.01; + lhsCost = lhsCost.multiplyBy(selectivity); + } + Cost rhsCost = Cost.ZERO; + for (SubPlan subPlan : subPlans) { + rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost()); + } + return cost.plus(lhsCost).plus(rhsCost); + } + protected interface SubPlan { public ServerCache execute(HashJoinPlan parent) throws SQLException; public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index 86f59c5..1d1332d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -35,6 +35,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; @@ -60,6 +61,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { } @Override + public Cost getCost() { + return Cost.ZERO; + } + + @Override public List<KeyRange> getSplits() { return Collections.emptyList(); }