This is an automated email from the ASF dual-hosted git repository. chinmayskulkarni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 788e753 PHOENIX-5673 : The mutation state is silently getting cleared on the execution of any DDL 788e753 is described below commit 788e7530b2224882df161e2353d3d8d9b7b6eec7 Author: Siddhi Mehta <sm26...@gmail.com> AuthorDate: Thu Feb 20 10:31:01 2020 -0800 PHOENIX-5673 : The mutation state is silently getting cleared on the execution of any DDL Signed-off-by: Chinmay Kulkarni <chinmayskulka...@apache.org> --- .../apache/phoenix/end2end/MutationStateIT.java | 52 ++++++++++++++++++++++ .../apache/phoenix/exception/SQLExceptionCode.java | 8 ++-- .../org/apache/phoenix/execute/MutationState.java | 10 +++-- .../org/apache/phoenix/jdbc/PhoenixStatement.java | 27 ++++++++++- .../org/apache/phoenix/query/QueryServices.java | 5 ++- .../apache/phoenix/query/QueryServicesOptions.java | 6 ++- .../apache/phoenix/execute/MutationStateTest.java | 42 +++++++++++++++-- .../apache/phoenix/index/IndexMaintainerTest.java | 1 + .../org/apache/phoenix/schema/MutationTest.java | 8 ++-- 9 files changed, 140 insertions(+), 19 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index 4d70d0a..1c1ce1d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -18,6 +18,7 @@ package org.apache.phoenix.end2end; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -37,6 +38,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -47,12 +50,15 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.Repeat; import org.apache.phoenix.util.RunUntilFailure; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; @RunWith(RunUntilFailure.class) @@ -498,4 +504,50 @@ public class MutationStateIT extends ParallelStatsDisabledIT { htable.close(); } } + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @Test + public void testDDLwithPendingMutations() throws Exception { + String tableName = generateUniqueName(); + ensureTableCreated(getUrl(), tableName, TestUtil.PTSDB_NAME, null, null, null); + Properties props = new Properties(); + props.setProperty(QueryServices.PENDING_MUTATIONS_DDL_THROW_ATTRIB, Boolean.toString(true)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + // setting auto commit to false + conn.setAutoCommit(false); + + // Run upsert queries but do not commit + PreparedStatement stmt = + conn.prepareStatement("UPSERT INTO " + tableName + + " (inst,host,\"DATE\") VALUES(?,'b',CURRENT_DATE())"); + stmt.setString(1, "a"); + stmt.execute(); + // Create a ddl statement + String tableName2 = generateUniqueName(); + String ddl = "CREATE TABLE " + tableName2 + " (V BIGINT PRIMARY KEY, K BIGINT)"; + exceptionRule.expect(SQLException.class); + exceptionRule.expectMessage( + SQLExceptionCode.CANNOT_PERFORM_DDL_WITH_PENDING_MUTATIONS.getMessage()); + conn.createStatement().execute(ddl); + } + } + + @Test + public void testNoPendingMutationsOnDDL() throws Exception { + Properties props = new Properties(); + props.setProperty(QueryServices.PENDING_MUTATIONS_DDL_THROW_ATTRIB, Boolean.toString(true)); + String tableName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = + "create table " + tableName + " ( id1 UNSIGNED_INT not null primary key," + + "appId1 VARCHAR)"; + conn.createStatement().execute(ddl); + // ensure table got created + Admin admin = driver.getConnectionQueryServices(getUrl(), props).getAdmin(); + assertNotNull(admin.getDescriptor(TableName.valueOf(tableName))); + assertNotNull(PhoenixRuntime.getTableNoCache(conn, tableName)); + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 74e9bdf..ad9be31 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -35,6 +35,8 @@ import org.apache.phoenix.schema.ConcurrentTableMutationException; import org.apache.phoenix.schema.FunctionAlreadyExistsException; import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.IndexNotFoundException; +import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException; +import org.apache.phoenix.schema.MaxMutationSizeExceededException; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; @@ -47,8 +49,6 @@ import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException; -import org.apache.phoenix.schema.MaxMutationSizeExceededException; import org.apache.phoenix.util.MetaDataUtil; import com.google.common.collect.Maps; @@ -514,8 +514,10 @@ public enum SQLExceptionCode { "Cannot use a connection with SCN set to upsert data for " + "table with ROW_TIMESTAMP column."), CANNOT_UPSERT_WITH_SCN_FOR_TABLE_WITH_INDEXES(903,"43M14", - "Cannot use a connection with SCN set to upsert data for a table with indexes."); + "Cannot use a connection with SCN set to upsert data for a table with indexes."), + CANNOT_PERFORM_DDL_WITH_PENDING_MUTATIONS(904, "43M15", + "Cannot perform DDL with pending mutations. Commit or rollback mutations before performing DDL"); private final int errorCode; private final String sqlState; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 79c8c90..dbe58e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -77,8 +77,9 @@ import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.IllegalDataException; +import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException; +import org.apache.phoenix.schema.MaxMutationSizeExceededException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PIndexState; @@ -91,10 +92,9 @@ import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PTimestamp; -import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException; -import org.apache.phoenix.schema.MaxMutationSizeExceededException; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; @@ -397,6 +397,10 @@ public class MutationState implements SQLCloseable { return sizeOffset + numRows; } + public int getNumRows() { + return numRows; + } + private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, Map<TableRef, MultiRowMutationState> dstMutations) { PTable table = tableRef.getTable(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 5312766..af4ae47 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -94,9 +94,9 @@ import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.log.QueryLogInfo; -import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.log.QueryLogger; import org.apache.phoenix.log.QueryLoggerUtil; +import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.AddColumnStatement; import org.apache.phoenix.parse.AddJarsStatement; @@ -114,6 +114,7 @@ import org.apache.phoenix.parse.CreateSchemaStatement; import org.apache.phoenix.parse.CreateSequenceStatement; import org.apache.phoenix.parse.CreateTableStatement; import org.apache.phoenix.parse.CursorName; +import org.apache.phoenix.parse.DMLStatement; import org.apache.phoenix.parse.DeclareCursorStatement; import org.apache.phoenix.parse.DeleteJarStatement; import org.apache.phoenix.parse.DeleteStatement; @@ -132,6 +133,7 @@ import org.apache.phoenix.parse.IndexKeyConstraint; import org.apache.phoenix.parse.LimitNode; import org.apache.phoenix.parse.ListJarsStatement; import org.apache.phoenix.parse.LiteralParseNode; +import org.apache.phoenix.parse.MutableStatement; import org.apache.phoenix.parse.NamedNode; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.OffsetNode; @@ -409,6 +411,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator(); state.sendUncommitted(tableRefs); state.checkpointIfNeccessary(plan); + checkIfDDLStatementandMutationState(stmt, state); MutationState lastState = plan.execute(); state.join(lastState); if (connection.getAutoCommit()) { @@ -2171,4 +2174,26 @@ public class PhoenixStatement implements Statement, SQLCloseable { } } + /** + * Check if the statement is a DDL and if there are any uncommitted mutations Throw or log the + * message + */ + private void checkIfDDLStatementandMutationState(final CompilableStatement stmt, + MutationState state) throws SQLException { + boolean throwUncommittedMutation = + connection.getQueryServices().getProps().getBoolean( + QueryServices.PENDING_MUTATIONS_DDL_THROW_ATTRIB, + QueryServicesOptions.DEFAULT_PENDING_MUTATIONS_DDL_THROW); + if (stmt instanceof MutableStatement && !(stmt instanceof DMLStatement) + && state.getNumRows() > 0) { + if (throwUncommittedMutation) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_PERFORM_DDL_WITH_PENDING_MUTATIONS).build() + .buildException(); + } else { + LOGGER.warn( + "There are Uncommitted mutations, which will be dropped on the execution of this DDL statement."); + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 00043f7..e2e67c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -19,14 +19,14 @@ package org.apache.phoenix.query; import java.util.concurrent.ThreadPoolExecutor; -import net.jcip.annotations.Immutable; - import org.apache.phoenix.iterate.SpoolTooBigToDiskException; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.optimize.QueryOptimizer; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseable; +import net.jcip.annotations.Immutable; + /** @@ -352,6 +352,7 @@ public interface QueryServices extends SQLCloseable { // QueryServicesOptions.DEFAULT_GUIDE_POSTS_CACHE_FACTORY_CLASS is used if this is not provided public static final String GUIDE_POSTS_CACHE_FACTORY_CLASS = "phoenix.guide.posts.cache.factory.class"; + public static final String PENDING_MUTATIONS_DDL_THROW_ATTRIB = "phoenix.pending.mutations.before.ddl.throw"; /** * Get executor service used for parallel scans */ diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index f7f8f06..4377735 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -75,14 +75,14 @@ import static org.apache.phoenix.query.QueryServices.RUN_RENEW_LEASE_FREQUENCY_I import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE; -import static org.apache.phoenix.query.QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB; +import static org.apache.phoenix.query.QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY; +import static org.apache.phoenix.query.QueryServices.STATS_CACHE_THREAD_POOL_SIZE; import static org.apache.phoenix.query.QueryServices.STATS_COLLECTION_ENABLED; import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB; -import static org.apache.phoenix.query.QueryServices.STATS_CACHE_THREAD_POOL_SIZE; import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB; @@ -363,6 +363,8 @@ public class QueryServicesOptions { public static final String DEFAULT_GUIDE_POSTS_CACHE_FACTORY_CLASS = "org.apache.phoenix.query.DefaultGuidePostsCacheFactory"; public static final boolean DEFAULT_LONG_VIEW_INDEX_ENABLED = false; + + public static final boolean DEFAULT_PENDING_MUTATIONS_DDL_THROW = false; private final Configuration config; private QueryServicesOptions(Configuration config) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java index 37fc13b..6b74858 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java @@ -17,30 +17,42 @@ */ package org.apache.phoenix.execute; -import com.google.common.collect.ImmutableList; - import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.Iterator; import java.util.List; +import java.util.Properties; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.types.PUnsignedInt; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.google.common.collect.ImmutableList; public class MutationStateTest { @@ -175,4 +187,28 @@ public class MutationStateTest { } } + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @Test + public void testPendingMutationsOnDDL() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.PENDING_MUTATIONS_DDL_THROW_ATTRIB, "true"); + try (Connection conn = DriverManager.getConnection(getUrl(), props); + PhoenixConnection pConnSpy = spy((PhoenixConnection) conn)) { + MutationState mutationState = mock(MutationState.class); + when(mutationState.getNumRows()).thenReturn(1); + + // Create a connection with mutation state and mock it + doReturn(mutationState).when(pConnSpy).getMutationState(); + exceptionRule.expect(SQLException.class); + exceptionRule.expectMessage( + SQLExceptionCode.CANNOT_PERFORM_DDL_WITH_PENDING_MUTATIONS.getMessage()); + + pConnSpy.createStatement().execute("create table MUTATION_TEST1" + + "( id1 UNSIGNED_INT not null primary key," + "appId1 VARCHAR)"); + } + + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java index 0204cd1..e2dbc32 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java @@ -152,6 +152,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { assertArrayEquals(dataRowKey, CellUtil.cloneRow(dataKeyValues.get(0))); } finally { try { + conn.rollback(); conn.createStatement().execute("DROP TABLE " + fullTableName); } finally { conn.close(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/MutationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/MutationTest.java index e0f48c0..cc28fb7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/schema/MutationTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/MutationTest.java @@ -45,8 +45,7 @@ public class MutationTest extends BaseConnectionlessQueryTest { } private void testDurability(boolean disableWAL) throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); - try { + try(Connection conn = DriverManager.getConnection(getUrl())) { Durability expectedDurability = disableWAL ? Durability.SKIP_WAL : Durability.USE_DEFAULT; conn.setAutoCommit(false); conn.createStatement().execute("CREATE TABLE t1 (k integer not null primary key, a.k varchar, b.k varchar) " + (disableWAL ? "DISABLE_WAL=true" : "")); @@ -55,9 +54,8 @@ public class MutationTest extends BaseConnectionlessQueryTest { assertDurability(conn,expectedDurability); conn.createStatement().execute("DELETE FROM t1 WHERE k=1"); assertDurability(conn,expectedDurability); + conn.rollback(); conn.createStatement().execute("DROP TABLE t1"); - } finally { - conn.close(); } } @@ -126,4 +124,4 @@ public class MutationTest extends BaseConnectionlessQueryTest { } } -} +} \ No newline at end of file