This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new a02a90520b PHOENIX-4555 Only mark view as updatable if rows cannot 
overlap with other updatable views (#1844)
a02a90520b is described below

commit a02a90520b37eccdfcd26e1dc439946eb4e5e01c
Author: Jing Yu <y...@salesforce.com>
AuthorDate: Wed Aug 14 12:21:33 2024 -0700

    PHOENIX-4555 Only mark view as updatable if rows cannot overlap with other 
updatable views (#1844)
---
 .../phoenix/compile/CreateTableCompiler.java       | 283 ++++++++-
 .../org/apache/phoenix/query/QueryServices.java    |   9 +
 .../end2end/UpdatableViewRestrictionsIT.java       | 682 +++++++++++++++++++++
 3 files changed, 973 insertions(+), 1 deletion(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
index bcc672b224..ccda4659c4 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -22,10 +22,15 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -54,9 +59,12 @@ import org.apache.phoenix.parse.PrimaryKeyConstraint;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
@@ -72,9 +80,18 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAMESPACE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
+import static 
org.apache.phoenix.query.QueryServices.DEFAULT_PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED;
 
 
 public class CreateTableCompiler {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CreateTableCompiler.class);
     private static final PDatum VARBINARY_DATUM = new VarbinaryDatum();
     private final PhoenixStatement statement;
     private final Operation operation;
@@ -97,6 +114,8 @@ public class CreateTableCompiler {
         String viewStatementToBe = null;
         byte[][] viewColumnConstantsToBe = null;
         BitSet isViewColumnReferencedToBe = null;
+        Set<PColumn> pkColumnsInWhere = new HashSet<>();
+        Set<PColumn> nonPkColumnsInWhere = new HashSet<>();
         byte[] rowKeyMatcher = ByteUtil.EMPTY_BYTE_ARRAY;
 
         // Check whether column families having local index column family 
suffix or not if present
@@ -172,9 +191,34 @@ public class CreateTableCompiler {
                     viewColumnConstantsToBe = new byte[nColumns][];
                     ViewWhereExpressionVisitor visitor = new 
ViewWhereExpressionVisitor(parentToBe, viewColumnConstantsToBe);
                     where.accept(visitor);
+
+                    viewTypeToBe = visitor.isUpdatable() ? ViewType.UPDATABLE 
: ViewType.READ_ONLY;
+                    boolean updatableViewRestrictionEnabled = 
connection.getQueryServices()
+                            
.getProps().getBoolean(PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED,
+                                    
DEFAULT_PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED);
+                    if (viewTypeToBe == ViewType.UPDATABLE && 
updatableViewRestrictionEnabled) {
+                        ViewWhereExpressionValidatorVisitor validatorVisitor =
+                                new 
ViewWhereExpressionValidatorVisitor(parentToBe,
+                                        pkColumnsInWhere, nonPkColumnsInWhere);
+                        where.accept(validatorVisitor);
+                        if (!(connection.getQueryServices()
+                                instanceof ConnectionlessQueryServicesImpl)) {
+                            try {
+                                viewTypeToBe = setViewTypeToBe(connection, 
parentToBe,
+                                        pkColumnsInWhere, nonPkColumnsInWhere);
+                                LOGGER.info("VIEW type is set to {}. View 
Statement: {}, " +
+                                                "View Name: {}, " +
+                                                "Parent Table/View Name: {}",
+                                        viewTypeToBe, viewStatementToBe,
+                                        create.getTableName(), 
parentToBe.getName());
+                            } catch (IOException e) {
+                                throw new SQLException(e);
+                            }
+                        }
+                    }
+
                     // If view is not updatable, viewColumnConstants should be 
empty. We will still
                     // inherit our parent viewConstants, but we have no 
additional ones.
-                    viewTypeToBe = visitor.isUpdatable() ? ViewType.UPDATABLE 
: ViewType.READ_ONLY;
                     if (viewTypeToBe != ViewType.UPDATABLE) {
                         viewColumnConstantsToBe = null;
                     }
@@ -221,6 +265,150 @@ public class CreateTableCompiler {
                 viewColumnConstants, isViewColumnReferenced, connection);
     }
 
+    /**
+     * Restrict view to be UPDATABLE if the view specification:
+     * 1. uses only the PK columns;
+     * 2. starts from the first PK column (ignore the prefix PK columns, 
TENANT_ID and/or
+     * _SALTED, if the parent table is multi-tenant and/or salted);
+     * 3. PK columns should be in the order they are defined;
+     * 4. uses the same set of PK columns as its sibling views' specification;
+     * Otherwise, mark the view as READ_ONLY.
+     *
+     * @param connection The client connection
+     * @param parentToBe To be parent for given view
+     * @param pkColumnsInWhere Set of primary key in where clause
+     * @param nonPkColumnsInWhere Set of non-primary key columns in where 
clause
+     * @throws IOException thrown if there is an error finding sibling views
+     * @throws SQLException
+     */
+    private ViewType setViewTypeToBe(final PhoenixConnection connection, final 
PTable parentToBe,
+                                     final Set<PColumn> pkColumnsInWhere,
+                                     final Set<PColumn> nonPkColumnsInWhere)
+            throws IOException, SQLException {
+        // 1. Check the view specification WHERE clause uses only the PK 
columns
+        if (!nonPkColumnsInWhere.isEmpty()) {
+            LOGGER.info("Setting the view type as READ_ONLY because the view 
statement contains " +
+                    "non-PK columns: {}", nonPkColumnsInWhere);
+            return ViewType.READ_ONLY;
+        }
+        if (pkColumnsInWhere.isEmpty()) {
+            return ViewType.UPDATABLE;
+        }
+
+        // 2. Check the WHERE clause starts from the first PK column (ignore 
the prefix PK
+        // columns, TENANT_ID and/or _SALTED, if the parent table is 
multi-tenant and/or salted)
+        List<Integer> tablePkPositions = new ArrayList<>();
+        List<Integer> viewPkPositions = new ArrayList<>();
+        List<PColumn> tablePkColumns = parentToBe.getPKColumns();
+        tablePkColumns.forEach(tablePkColumn ->
+                tablePkPositions.add(tablePkColumn.getPosition()));
+        pkColumnsInWhere.forEach(pkColumn -> 
viewPkPositions.add(pkColumn.getPosition()));
+        Collections.sort(viewPkPositions);
+        int tablePkStartIdx = 0;
+        if (parentToBe.isMultiTenant()) {
+            tablePkStartIdx++;
+        }
+        if (parentToBe.getBucketNum() != null) {
+            tablePkStartIdx++;
+        }
+        if (!Objects.equals(viewPkPositions.get(0), 
tablePkPositions.get(tablePkStartIdx))) {
+            LOGGER.info("Setting the view type as READ_ONLY because the view 
statement WHERE " +
+                    "clause does not start from the first PK column (ignore 
the prefix PKs " +
+                    "if the parent table is multi-tenant and/or salted). View 
PK Columns: " +
+                    "{}, Table PK Columns: {}", pkColumnsInWhere, 
tablePkColumns);
+            return ViewType.READ_ONLY;
+        }
+
+        // 3. Check PK columns are in the order they are defined
+        if (!isPkColumnsInOrder(viewPkPositions, tablePkPositions, 
tablePkStartIdx)) {
+            LOGGER.info("Setting the view type as READ_ONLY because the PK 
columns is not in the " +
+                    "order they are defined. View PK Columns: {}, Table PK 
Columns: {}",
+                    pkColumnsInWhere, tablePkColumns);
+            return ViewType.READ_ONLY;
+        }
+
+        // 4. Check the view specification has the same set of PK column(s) as 
its sibling view
+        byte[] parentTenantIdInBytes = parentToBe.getTenantId() != null
+                ? parentToBe.getTenantId().getBytes() : null;
+        byte[] parentSchemaNameInBytes = parentToBe.getSchemaName() != null
+                ? parentToBe.getSchemaName().getBytes() : null;
+        ConnectionQueryServices queryServices = connection.getQueryServices();
+        Configuration config = queryServices.getConfiguration();
+        byte[] systemChildLinkTable = 
SchemaUtil.isNamespaceMappingEnabled(null, config)
+                ? SYSTEM_CHILD_LINK_NAMESPACE_BYTES
+                : SYSTEM_CHILD_LINK_NAME_BYTES;
+        try (Table childLinkTable = 
queryServices.getTable(systemChildLinkTable)) {
+            List<PTable> legitimateSiblingViewList =
+                    ViewUtil.findAllDescendantViews(childLinkTable, config, 
parentTenantIdInBytes,
+                            parentSchemaNameInBytes, 
parentToBe.getTableName().getBytes(),
+                            HConstants.LATEST_TIMESTAMP, true).getFirst();
+            if (!legitimateSiblingViewList.isEmpty()) {
+                PTable siblingView = legitimateSiblingViewList.get(0);
+                Expression siblingViewWhere = getWhereFromView(connection, 
siblingView);
+                Set<PColumn> siblingViewPkColsInWhere = new HashSet<>();
+                if (siblingViewWhere != null) {
+                    ViewWhereExpressionValidatorVisitor 
siblingViewValidatorVisitor =
+                            new ViewWhereExpressionValidatorVisitor(parentToBe,
+                                    siblingViewPkColsInWhere, null);
+                    siblingViewWhere.accept(siblingViewValidatorVisitor);
+                }
+                if (!pkColumnsInWhere.equals(siblingViewPkColsInWhere)) {
+                    LOGGER.info("Setting the view type as READ_ONLY because 
its set of PK " +
+                                    "columns is different from its sibling 
view {}'s. View PK " +
+                                    "Columns: {}, Sibling View PK Columns: {}",
+                            siblingView.getName(), pkColumnsInWhere, 
siblingViewPkColsInWhere);
+                    return ViewType.READ_ONLY;
+                }
+            }
+        }
+        return ViewType.UPDATABLE;
+    }
+
+    /**
+     * Get the where Expression of given view.
+     * @param connection The client connection
+     * @param view PTable of the view
+     * @return A where Expression
+     * @throws SQLException
+     */
+    private Expression getWhereFromView(final PhoenixConnection connection, 
final PTable view)
+            throws SQLException {
+        String viewStatement = view.getViewStatement();
+        if (viewStatement == null) {
+            return null;
+        }
+        SelectStatement select = new SQLParser(viewStatement).parseQuery();
+        ColumnResolver resolver = FromCompiler.getResolverForQuery(select, 
connection);
+        StatementContext context = new StatementContext(new 
PhoenixStatement(connection), resolver);
+        BitSet isViewColumnReferencedToBe = new 
BitSet(view.getColumns().size());
+        ExpressionCompiler expressionCompiler = new 
ColumnTrackingExpressionCompiler(context,
+                isViewColumnReferencedToBe);
+        ParseNode whereNode = select.getWhere();
+        return whereNode.accept(expressionCompiler);
+    }
+
+    /**
+     * Check if the primary key columns are in order (consecutive in position) 
as they are
+     * defined, providing their positions list
+     * @param viewPkPositions A positions list of view PK columns to be checked
+     * @param tablePkPositions The positions list of the table's PK columns to 
be compared
+     * @param tablePkStartIdx The start index of table PK position, depending 
on whether the
+     *                        table is multi-tenant and/or salted
+     * @return true if the PK columns are in order, otherwise false
+     */
+    private boolean isPkColumnsInOrder(final List<Integer> viewPkPositions,
+                                       final List<Integer> tablePkPositions,
+                                       final int tablePkStartIdx) {
+        for (int i = 1; i < viewPkPositions.size(); i++) {
+            if (!Objects.equals(
+                    viewPkPositions.get(i),
+                    tablePkPositions.get(tablePkStartIdx + i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * If any of the parent table/view has indexes in the parent hierarchy, 
and the current
      * view under creation extends the primary key of the parent, throw error.
@@ -432,6 +620,99 @@ public class CreateTableCompiler {
         }
         
     }
+
+    /**
+     * Visitor for view's where expression, which updates primary key columns 
and non-primary key
+     * columns for validating if the view is updatable
+     */
+    public static class ViewWhereExpressionValidatorVisitor extends
+            StatelessTraverseNoExpressionVisitor<Boolean> {
+        private boolean isUpdatable = true;
+        private final PTable table;
+        private final Set<PColumn> pkColumns;
+        private final Set<PColumn> nonPKColumns;
+
+        public ViewWhereExpressionValidatorVisitor(PTable table, Set<PColumn> 
pkColumns,
+                                                    Set<PColumn> nonPKColumns) 
{
+            this.table = table;
+            this.pkColumns = pkColumns;
+            this.nonPKColumns = nonPKColumns;
+        }
+
+        public boolean isUpdatable() {
+            return isUpdatable;
+        }
+
+        @Override
+        public Boolean defaultReturn(Expression node, List<Boolean> l) {
+            // We only hit this if we're trying to traverse somewhere
+            // in which we don't have a visitLeave that returns non null
+            isUpdatable = false;
+            return null;
+        }
+
+        @Override
+        public Iterator<Expression> visitEnter(AndExpression node) {
+            return node.getChildren().iterator();
+        }
+
+        @Override
+        public Boolean visitLeave(AndExpression node, List<Boolean> l) {
+            return l.isEmpty() ? null : Boolean.TRUE;
+        }
+
+        @Override
+        public Iterator<Expression> visitEnter(ComparisonExpression node) {
+            if (node.getFilterOp() == CompareOperator.EQUAL
+                    && node.getChildren().get(1).isStateless()
+                    && node.getChildren().get(1).getDeterminism() == 
Determinism.ALWAYS) {
+                return Iterators.singletonIterator(node.getChildren().get(0));
+            }
+            return super.visitEnter(node);
+        }
+
+        @Override
+        public Boolean visitLeave(ComparisonExpression node, List<Boolean> l) {
+            return l.isEmpty() ? null : Boolean.TRUE;
+        }
+
+        @Override
+        public Iterator<Expression> visitEnter(IsNullExpression node) {
+            return node.isNegate() ? super.visitEnter(node) : 
node.getChildren().iterator();
+        }
+
+        @Override
+        public Boolean visitLeave(IsNullExpression node, List<Boolean> l) {
+            // Nothing to do as we've already set the position to an empty 
byte array
+            return l.isEmpty() ? null : Boolean.TRUE;
+        }
+
+        @Override
+        public Boolean visit(RowKeyColumnExpression node) {
+            pkColumns.add(table.getPKColumns().get(node.getPosition()));
+            return Boolean.TRUE;
+        }
+
+        @Override
+        public Boolean visit(KeyValueColumnExpression node) {
+            try {
+                if (nonPKColumns != null) {
+                    nonPKColumns.add(
+                            table.getColumnFamily(node.getColumnFamily())
+                                    
.getPColumnForColumnQualifier(node.getColumnQualifier()));
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+            return Boolean.TRUE;
+        }
+
+        @Override
+        public Boolean visit(SingleCellColumnExpression node) {
+            return visit(node.getKeyValueExpression());
+        }
+    }
+
     private static class VarbinaryDatum implements PDatum {
 
         @Override
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 1a42cf5d3b..24c72521d8 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -472,6 +472,15 @@ public interface QueryServices extends SQLCloseable {
 
     boolean DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION = false;
 
+    /**
+     * Param to enable updatable view restriction that only mark view as 
updatable if rows
+     * cannot overlap with other updatable views.
+     */
+    String PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED =
+            "phoenix.updatable.view.restriction.enabled";
+
+    boolean DEFAULT_PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED = false;
+
     /**
      * Only used by tests: parameter to determine num of regionservers to be 
created by
      * MiniHBaseCluster.
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdatableViewRestrictionsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdatableViewRestrictionsIT.java
new file mode 100644
index 0000000000..b90ccb2d38
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdatableViewRestrictionsIT.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ReadOnlyTableException;
+import org.apache.phoenix.schema.TableProperty;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PHOENIX_META_DATA_COPROCESSOR_CONF_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for restrictions associated with updatable view.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class UpdatableViewRestrictionsIT extends SplitSystemCatalogIT {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UpdatableViewRestrictionsIT.class);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        NUM_SLAVES_BASE = 6;
+        boolean splitSystemCatalog = (driver == null);
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+        serverProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true");
+        serverProps.put(PHOENIX_META_DATA_COPROCESSOR_CONF_KEY,
+                ViewConcurrencyAndFailureIT.TestMetaDataRegionObserver.class
+                        .getName());
+        serverProps.put("hbase.coprocessor.abortonerror", "false");
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+                ReadOnlyProps.EMPTY_PROPS);
+        // Split SYSTEM.CATALOG once after the mini-cluster is started
+        if (splitSystemCatalog) {
+            // splitSystemCatalog is incompatible with the balancer chore
+            getUtility().getHBaseCluster().getMaster().balanceSwitch(false);
+            splitSystemCatalog();
+        }
+    }
+
+    private void createTable(
+            Connection conn, String tableSQL, Map<String, Object> tableProps) 
throws Exception {
+        List<String> props = new ArrayList<>();
+        Boolean multitenant = (Boolean) 
TableProperty.MULTI_TENANT.getValue(tableProps);
+        if (multitenant != null && multitenant) {
+            props.add(TableProperty.MULTI_TENANT.getPropertyName() + "=" + 
multitenant);
+        }
+        Integer nSaltBuckets = (Integer) 
TableProperty.SALT_BUCKETS.getValue(tableProps);
+        if (nSaltBuckets != null) {
+            props.add(TableProperty.SALT_BUCKETS.getPropertyName() + "=" + 
nSaltBuckets);
+        }
+        tableSQL += " " + String.join(", ", props);
+        LOGGER.debug("Creating table with SQL: " + tableSQL);
+        conn.createStatement().execute(tableSQL);
+    }
+
+    private void createTable(Connection conn, String tableName,
+                             boolean multitenant, Integer nSaltBuckets) throws 
Exception {
+        String tableSQL = "CREATE TABLE " + tableName + " ("
+                + (multitenant ? "TENANT_ID VARCHAR NOT NULL, " : "")
+                + "k1 INTEGER NOT NULL, k2 DECIMAL, k3 INTEGER NOT NULL, s 
VARCHAR "
+                + "CONSTRAINT pk PRIMARY KEY ("
+                + (multitenant ? "TENANT_ID, " : "")
+                + "k1, k2, k3))";
+        createTable(conn, tableSQL, new HashMap<String, Object>() {{
+            put(TableProperty.MULTI_TENANT.getPropertyName(), multitenant);
+            put(TableProperty.SALT_BUCKETS.getPropertyName(), nSaltBuckets);
+        }});
+    }
+
+    private void createTable(Connection conn, String tableName) throws 
Exception {
+        createTable(conn, tableName, false, null);
+    }
+
+    private Connection getTenantConnection(final String tenantId) throws 
Exception {
+        Properties tenantProps = new Properties();
+        tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        
tenantProps.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED,
 "true");
+        return DriverManager.getConnection(getUrl(), tenantProps);
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int 
expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if there are non-PK columns in the 
WHERE clause.
+     */
+    @Test
+    public void testReadOnlyViewWithNonPkInWhere() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            createTable(conn, fullTableName);
+
+            Statement stmt = conn.createStatement();
+            String viewDDL = "CREATE VIEW " + fullViewName + " AS SELECT * 
FROM " + fullTableName +
+                    " WHERE s = 'a'";
+            stmt.execute(viewDDL);
+            try {
+                stmt.execute("UPSERT INTO " + fullViewName + " VALUES(1, 2, 3, 
'a')");
+                fail();
+            } catch (ReadOnlyTableException ignored) {
+            }
+        }
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if PK columns in the WHERE clause 
are not in the order
+     * they are defined: primary key k2 is missing in this case.
+     */
+    @Test
+    public void testReadOnlyViewWithPkNotInOrderInWhere1() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            createTable(conn, fullTableName);
+
+            Statement stmt = conn.createStatement();
+            String viewDDL = "CREATE VIEW " + fullViewName + " AS SELECT * 
FROM " + fullTableName +
+                    " WHERE k1 = 1 AND k3 = 3";
+            stmt.execute(viewDDL);
+            try {
+                stmt.execute("UPSERT INTO " + fullViewName + " VALUES(1, 2, 3, 
'a')");
+                fail();
+            } catch (ReadOnlyTableException ignored) {
+            }
+        }
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if PK columns in the WHERE clause 
are not in the order
+     * they are defined: primary key k3 is missing in this case.
+     */
+    @Test
+    public void testReadOnlyViewWithPkNotInOrderInWhere2() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("CREATE TABLE " + fullTableName
+                    + " (k1 INTEGER NOT NULL, k2 DECIMAL, k3 INTEGER NOT NULL, 
s VARCHAR "
+                    + "CONSTRAINT pk PRIMARY KEY (k1, k2, k3, s))");
+
+            Statement stmt = conn.createStatement();
+            String viewDDL = "CREATE VIEW " + fullViewName + " AS SELECT * 
FROM " + fullTableName +
+                    " WHERE k1 = 1 AND k2 = 2 AND s = 'a'";
+            stmt.execute(viewDDL);
+            try {
+                stmt.execute("UPSERT INTO " + fullViewName + " VALUES(1, 2, 3, 
'a')");
+                fail();
+            } catch (ReadOnlyTableException ignored) {
+            }
+        }
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if the set of PK columns in the 
WHERE clause are not
+     * same as its sibling view's: primary key k3 is redundant in this case.
+     */
+    @Test
+    public void testReadOnlyViewWithPkNotSameInWhere1() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullGlobalViewName = SchemaUtil.getTableName(SCHEMA2, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+        String fullSiblingViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            createTable(conn, fullTableName);
+
+            Statement stmt = conn.createStatement();
+            String globalViewDDL =
+                    "CREATE VIEW " + fullGlobalViewName + " AS SELECT * FROM " 
+ fullTableName +
+                            " WHERE k1 = 1";
+            stmt.execute(globalViewDDL);
+
+            String siblingViewDDL = "CREATE VIEW " + fullSiblingViewName + " 
AS SELECT * FROM " +
+                    fullGlobalViewName + " WHERE k2 = 1";
+            stmt.execute(siblingViewDDL);
+            String viewDDL =
+                    "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + 
fullGlobalViewName
+                            + " WHERE k2 = 2 AND k3 = 109";
+            stmt.execute(viewDDL);
+            try {
+                stmt.execute("UPSERT INTO " + fullViewName + " VALUES(1, 2, 
109, 'a')");
+                fail();
+            } catch (ReadOnlyTableException ignored) {
+            }
+        }
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if the set of PK columns in the 
WHERE clause are not
+     * same as its sibling view's: primary key k3 is missing in this case.
+     */
+    @Test
+    public void testReadOnlyViewWithPkNotSameInWhere2() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullGlobalViewName = SchemaUtil.getTableName(SCHEMA2, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+        String fullSiblingViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            createTable(conn, fullTableName);
+
+            Statement stmt = conn.createStatement();
+            String globalViewDDL =
+                    "CREATE VIEW " + fullGlobalViewName + " AS SELECT * FROM " 
+ fullTableName +
+                            " WHERE k1 = 1";
+            stmt.execute(globalViewDDL);
+
+            String siblingViewDDL = "CREATE VIEW " + fullSiblingViewName + " 
AS SELECT * FROM " +
+                    fullGlobalViewName + " WHERE k2 = 1 AND k3 = 109";
+            stmt.execute(siblingViewDDL);
+            String viewDDL =
+                    "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + 
fullGlobalViewName
+                            + " WHERE k2 = 2";
+            stmt.execute(viewDDL);
+            try {
+                stmt.execute("UPSERT INTO " + fullViewName + " VALUES(1, 2, 
109, 'a')");
+                fail();
+            } catch (ReadOnlyTableException ignored) {
+            }
+        }
+    }
+
+    /**
+     * Test that the view type is UPDATABLE if the view statement WHERE clause
+     * starts from the first PK column (ignore the prefix PK columns, 
TENANT_ID and/or _SALTED,
+     * if the parent table is multi-tenant and/or salted),
+     * and satisfies all other criteria.
+     * @param multitenant Whether the parent table is multi-tenant
+     * @param nSaltBuckets Number of salt buckets
+     * @throws Exception
+     */
+    private void testUpdatableViewStartFromFirstPK(boolean multitenant, 
Integer nSaltBuckets) throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            createTable(conn, fullTableName, multitenant, nSaltBuckets);
+            String tenantId = null;
+            if (multitenant) {
+                tenantId = TENANT1;
+                try (Connection tenantConn = getTenantConnection(tenantId)) {
+                    createAndVerifyUpdatableView(fullTableName, tenantConn);
+                }
+            } else {
+                createAndVerifyUpdatableView(fullTableName, conn);
+            }
+            verifyNumberOfRows(fullTableName, tenantId, 1, conn);
+        }
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if the view statement WHERE clause
+     * does not start from the first PK column (ignore the prefix PK columns, 
TENANT_ID and/or
+     * _SALTED, if the parent table is multi-tenant and/or salted).
+     * @param multitenant Whether the parent table is multi-tenant
+     * @param nSaltBuckets Number of salt buckets
+     * @throws Exception
+     */
+    private void testReadOnlyViewNotStartFromFirstPK(boolean multitenant, 
Integer nSaltBuckets) throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA2, 
generateUniqueName());
+        String tenantId = TENANT1;
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection globalConn = DriverManager.getConnection(getUrl(), 
props);
+             Connection tenantConn = getTenantConnection(tenantId)) {
+            globalConn.setAutoCommit(true);
+            tenantConn.setAutoCommit(true);
+
+            createTable(globalConn, fullTableName, multitenant, nSaltBuckets);
+
+            final Statement stmt = multitenant ? tenantConn.createStatement() :
+                    globalConn.createStatement();
+            stmt.execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " +
+                    fullTableName + " WHERE k2 = 2");
+            try {
+                stmt.execute(String.format("UPSERT INTO %s VALUES" +
+                        "(1, 2, 3, 's')", fullViewName));
+                fail();
+            } catch (ReadOnlyTableException ignored) {
+            }
+        }
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if the view statement WHERE clause 
does not start
+     * from the first PK column when the parent table is neither multi-tenant 
nor salted.
+     */
+    @Test
+    public void 
testReadOnlyViewOnNonMultitenantNonSaltedTableNotStartFromFirstPK() throws 
Exception {
+        testReadOnlyViewNotStartFromFirstPK(false, null);
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if the view statement WHERE clause 
does not start
+     * from the first PK column when the parent table is multi-tenant and not 
salted (ignore the
+     * prefix PK column TENANT_ID).
+     */
+    @Test
+    public void 
testReadOnlyViewOnMultitenantNonSaltedTableNotStartFromFirstPK() throws 
Exception {
+        testReadOnlyViewNotStartFromFirstPK(true, null);
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if the view statement WHERE clause 
does not start
+     * from the first PK column when the parent table is not multi-tenant but 
salted (ignore
+     * the prefix PK column _SALTED).
+     */
+    @Test
+    public void 
testReadOnlyViewOnNonMultitenantSaltedTableNotStartFromFirstPK() throws 
Exception {
+        testReadOnlyViewNotStartFromFirstPK(false, 3);
+    }
+
+    /**
+     * Test that the view type is READ_ONLY if the view statement WHERE clause 
does not start
+     * from the first PK column when the parent table is both multi-tenant and 
salted (ignore
+     * the prefix PK columns TENANT_ID and _SALTED).
+     */
+    @Test
+    public void testReadOnlyViewOnMultitenantSaltedTableNotStartFromFirstPK() 
throws Exception {
+        testReadOnlyViewNotStartFromFirstPK(true, 3);
+    }
+
+    /**
+     * Test that the view type is UPDATABLE if the view statement WHERE clause
+     * starts from the first PK column when the parent table is neither 
multi-tenant nor salted,
+     * and satisfies all other criteria.
+     */
+    @Test
+    public void 
testUpdatableViewOnNonMultitenantNonSaltedTableStartFromFirstPK() throws 
Exception {
+        testUpdatableViewStartFromFirstPK(false, null);
+    }
+
+    /**
+     * Test that the view type is UPDATABLE if the view statement WHERE clause
+     * starts from the first PK column when the parent table is multi-tenant 
and not salted
+     * (ignore the prefix PK column TENANT_ID),
+     * and satisfies all other criteria.
+     */
+    @Test
+    public void testUpdatableViewOnMultitenantNonSaltedTableStartFromFirstPK() 
throws Exception {
+        testUpdatableViewStartFromFirstPK(true, null);
+    }
+
+    /**
+     * Test that the view type is UPDATABLE if the view statement WHERE clause
+     * starts from the first PK column when the parent table is not 
multi-tenant but salted
+     * (ignore the prefix PK column _SALTED),
+     * and satisfies all other criteria.
+     */
+    @Test
+    public void testUpdatableViewOnNonMultitenantSaltedTableStartFromFirstPK() 
throws Exception {
+        testUpdatableViewStartFromFirstPK(false, 3);
+    }
+
+    /**
+     * Test that the view type is UPDATABLE if the view statement WHERE clause
+     * starts from the first PK column when the parent table is both 
multi-tenant and salted
+     * (ignore the prefix PK columns TENANT_ID and _SALTED),
+     * and satisfies all other criteria.
+     */
+    @Test
+    public void testUpdatableViewOnMultitenantSaltedTableStartFromFirstPK() 
throws Exception {
+        testUpdatableViewStartFromFirstPK(true, 3);
+    }
+
+    private void createAndVerifyUpdatableView(
+            String fullTableName, Connection conn) throws Exception {
+        String fullViewName = SchemaUtil.getTableName(SCHEMA2, 
generateUniqueName());
+        Statement stmt = conn.createStatement();
+
+        stmt.execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " + 
fullTableName +
+                " WHERE k1 = 1");
+        stmt.execute("UPSERT INTO " + fullViewName + " VALUES(1, 2, 109, 
'a')");
+        conn.commit();
+
+        ResultSet rs = stmt.executeQuery("SELECT k1, k2, k3 FROM " + 
fullViewName);
+
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals(2, rs.getInt(2));
+        assertEquals(109, rs.getInt(3));
+        assertFalse(rs.next());
+    }
+
+    /**
+     * Test that the view type is READ_ONLY when it's created on an updatable 
view and PK columns
+     * in the WHERE clause are not in the order they are defined.
+     */
+    @Test
+    public void testReadOnlyViewOnUpdatableView1() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA2, 
generateUniqueName());
+        String fullChildViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+
+        createTable(conn, fullTableName);
+        Statement stmt = conn.createStatement();
+
+        String viewDDL = "CREATE VIEW " + fullViewName +
+                " AS SELECT * FROM " + fullTableName + " WHERE k1 = 1";
+        stmt.execute(viewDDL);
+
+        String childViewDDL = "CREATE VIEW " + fullChildViewName +
+                " AS SELECT * FROM " + fullViewName + " WHERE k3 = 3";
+        stmt.execute(childViewDDL);
+
+        try {
+            stmt.execute("UPSERT INTO " + fullChildViewName + " VALUES(1, 2, 
3, 'a')");
+            fail();
+        } catch (ReadOnlyTableException ignored) {
+        }
+    }
+
+    /**
+     * Test that the view type is READ_ONLY when it's created on an updatable 
view and there are
+     * non-PK columns in the WHERE clause.
+     */
+    @Test
+    public void testReadOnlyViewOnUpdatableView2() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA2, 
generateUniqueName());
+        String fullChildViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+
+        createTable(conn, fullTableName);
+        Statement stmt = conn.createStatement();
+
+        String viewDDL = "CREATE VIEW " + fullViewName +
+                " AS SELECT * FROM " + fullTableName + " WHERE k1 = 1";
+        stmt.execute(viewDDL);
+
+        String childViewDDL = "CREATE VIEW " + fullChildViewName +
+                " AS SELECT * FROM " + fullViewName + " WHERE k2 = 2 AND k3 = 
3 AND s = 'a'";
+        stmt.execute(childViewDDL);
+
+        try {
+            stmt.execute("UPSERT INTO " + fullChildViewName + " VALUES(1, 2, 
3, 'a')");
+            fail();
+        } catch (ReadOnlyTableException ignored) {
+        }
+    }
+
+    @Test
+    public void testUpdatableViewOnUpdatableView() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA2, 
generateUniqueName());
+        String fullChildViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+
+        createTable(conn, fullTableName);
+        Statement stmt = conn.createStatement();
+
+        String viewDDL = "CREATE VIEW " + fullViewName +
+                " AS SELECT * FROM " + fullTableName + " WHERE k1 = 1";
+        stmt.execute(viewDDL);
+
+        String childViewDDL = "CREATE VIEW " + fullChildViewName +
+                " AS SELECT * FROM " + fullViewName + " WHERE k2 = 2 AND k3 = 
3";
+        stmt.execute(childViewDDL);
+
+        stmt.execute("UPSERT INTO " + fullChildViewName + " VALUES(1, 2, 3, 
'a')");
+
+        ResultSet rs = stmt.executeQuery("SELECT * FROM " + fullChildViewName);
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals(2, rs.getInt(2));
+        assertEquals(3, rs.getInt(3));
+        assertEquals("a", rs.getString(4));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testSiblingsUpdatableOnUpdatableView() throws Exception {
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
generateUniqueName());
+        String fullGlobalViewName = SchemaUtil.getTableName(SCHEMA2, 
generateUniqueName());
+        String fullViewName = SchemaUtil.getTableName(SCHEMA3, 
generateUniqueName());
+        String fullLeafViewName1 = SchemaUtil.getTableName(SCHEMA4, 
generateUniqueName());
+        String fullLeafViewName2 = SchemaUtil.getTableName(SCHEMA4, 
generateUniqueName());
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            createTable(conn, fullTableName);
+
+            Statement stmt = conn.createStatement();
+            String globalViewDDL = "CREATE VIEW " + fullGlobalViewName + " AS 
SELECT * FROM "
+                    + fullTableName + " WHERE k1 = 1";
+            stmt.execute(globalViewDDL);
+            String viewDDL =
+                    "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + 
fullGlobalViewName
+                            + " WHERE k2 = 1";
+            stmt.execute(viewDDL);
+            String leafView1DDL = "CREATE VIEW " + fullLeafViewName1 + " AS 
SELECT * FROM "
+                    + fullViewName + " WHERE k3 = 101";
+            stmt.execute(leafView1DDL);
+            String leafView2DDL = "CREATE VIEW " + fullLeafViewName2 + " AS 
SELECT * FROM "
+                    + fullViewName + " WHERE k3 = 105";
+            stmt.execute(leafView2DDL);
+
+            for (int i = 0; i < 10; i++) {
+                stmt.execute("UPSERT INTO " + fullTableName
+                        + " VALUES(" + (i % 4) + "," + (i > 5 ? 2 : 1) + "," + 
(i + 100) + ")");
+            }
+            conn.commit();
+
+            ResultSet rs;
+            rs = stmt.executeQuery("SELECT count(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(10, rs.getInt(1));
+            rs = stmt.executeQuery("SELECT count(*) FROM " + 
fullGlobalViewName);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            rs = stmt.executeQuery("SELECT count(*) FROM " + fullViewName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+
+            rs = stmt.executeQuery("SELECT k1, k2, k3 FROM " + 
fullLeafViewName1);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertEquals(1, rs.getInt(2));
+            assertEquals(101, rs.getInt(3));
+            assertFalse(rs.next());
+            rs = stmt.executeQuery("SELECT k1, k2, k3 FROM " + 
fullLeafViewName2);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertEquals(1, rs.getInt(2));
+            assertEquals(105, rs.getInt(3));
+            assertFalse(rs.next());
+
+            stmt.execute("UPSERT INTO " + fullLeafViewName1 + " VALUES(1, 1, 
101, 'leaf1')");
+            stmt.execute("UPSERT INTO " + fullLeafViewName2 + " VALUES(1, 1, 
105, 'leaf2')");
+            conn.commit();
+            rs = stmt.executeQuery("SELECT s FROM " + fullLeafViewName1);
+            assertTrue(rs.next());
+            assertEquals("leaf1", rs.getString(1));
+            assertFalse(rs.next());
+            rs = stmt.executeQuery("SELECT s FROM " + fullLeafViewName2);
+            assertTrue(rs.next());
+            assertEquals("leaf2", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testUpdatableViewWithMultiTenantTable() throws Exception {
+        final String tableName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(SCHEMA1, 
tableName);
+        final String view01 = SchemaUtil.getTableName(SCHEMA2, "v01_" + 
tableName);
+        final String view02 = SchemaUtil.getTableName(SCHEMA3, "v02_" + 
tableName);
+        final String view03 = SchemaUtil.getTableName(SCHEMA4, "v03_" + 
tableName);
+
+        Properties props = new Properties();
+        
props.setProperty(QueryServices.PHOENIX_UPDATABLE_VIEW_RESTRICTION_ENABLED, 
"true");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            final Statement stmt = conn.createStatement();
+
+            stmt.execute("CREATE TABLE " + fullTableName
+                    + " (TENANT_ID VARCHAR NOT NULL, COL1 CHAR(10) NOT NULL, 
COL2 CHAR(5) NOT "
+                    + "NULL, COL3 VARCHAR, COL4 VARCHAR CONSTRAINT pk PRIMARY 
KEY(TENANT_ID, "
+                    + "COL1, COL2)) MULTI_TENANT = true");
+
+            try (Connection tenantConn = getTenantConnection(TENANT1)) {
+                tenantConn.setAutoCommit(true);
+                final Statement tenantStmt = tenantConn.createStatement();
+
+                stmt.execute("CREATE VIEW " + view01
+                        + " (VCOL1 CHAR(8) NOT NULL, COL5 VARCHAR CONSTRAINT 
pk PRIMARY KEY(VCOL1))"
+                        + " AS SELECT * FROM " + fullTableName + " WHERE COL1 
= 'col1'");
+                tenantStmt.execute("CREATE VIEW " + view02 + " AS SELECT * 
FROM " + view01
+                                + " WHERE COL2 = 'col2'");
+                tenantStmt.execute("CREATE VIEW " + view03 + " AS SELECT * 
FROM " + view02
+                        + " WHERE VCOL1 = 'vcol1'");
+
+                tenantStmt.execute(String.format("UPSERT INTO %s 
(VCOL1,COL3,COL4,COL5)" +
+                        " VALUES('vcol2', 'col3', 'col4', 'col5')", view02));
+                tenantStmt.execute(String.format("UPSERT INTO %s 
(COL3,COL4,COL5)" +
+                        " VALUES('col3', 'col4', 'col5')", view03));
+
+                verifyNumberOfRows(fullTableName, TENANT1, 2, conn);
+                ResultSet rs = tenantConn.createStatement().executeQuery(
+                        "SELECT COL1, COL2, VCOL1 FROM " + view02);
+                assertTrue(rs.next());
+                assertEquals("col1", rs.getString(1));
+                assertEquals("col2", rs.getString(2));
+                assertEquals("vcol1", rs.getString(3));
+                assertTrue(rs.next());
+                assertEquals("col1", rs.getString(1));
+                assertEquals("col2", rs.getString(2));
+                assertEquals("vcol2", rs.getString(3));
+                assertFalse(rs.next());
+
+                rs = tenantConn.createStatement().executeQuery(
+                        "SELECT COL1, COL2, VCOL1 FROM " + view03);
+                assertTrue(rs.next());
+                assertEquals("col1", rs.getString(1));
+                assertEquals("col2", rs.getString(2));
+                assertEquals("vcol1", rs.getString(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+}

Reply via email to