http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java index f5905ee..d36e0fe 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java @@ -89,7 +89,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { private void testUpsertSelect(boolean createIndex) throws Exception { long ts = nextTimestamp(); String tenantId = getOrganizationId(); - initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl()); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1); String indexName = "IDX1"; @@ -210,7 +210,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { public void testUpsertSelectEmptyPKColumn() throws Exception { long ts = nextTimestamp(); String tenantId = getOrganizationId(); - initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl()); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); ensureTableCreated(getUrl(), PTSDB_NAME, PTSDB_NAME, ts-1); Properties props = new Properties(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 1)); // Execute at timestamp 1 @@ -386,7 +386,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { private void testUpsertSelectForAgg(boolean autoCommit) throws Exception { long ts = nextTimestamp(); String tenantId = getOrganizationId(); - initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl()); + initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null); ensureTableCreated(getUrl(), PTSDB_NAME, PTSDB_NAME, ts-1); Properties props = new Properties(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 1)); // Execute at timestamp 1 @@ -462,7 +462,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)}; long ts = nextTimestamp(); - ensureTableCreated(getUrl(),"IntKeyTest", "IntKeyTest", splits, ts-2); + ensureTableCreated(getUrl(),"IntKeyTest", "IntKeyTest", splits, ts-2, null); Properties props = new Properties(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 1)); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -602,7 +602,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)}; long ts = nextTimestamp(); - ensureTableCreated(getUrl(),"IntKeyTest", "IntKeyTest", splits,ts-2); + ensureTableCreated(getUrl(),"IntKeyTest", "IntKeyTest", splits,ts-2, null); Properties props = new Properties(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 1)); Connection conn = DriverManager.getConnection(getUrl(), props);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java index 64935d2..11df167 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java @@ -35,15 +35,26 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; +import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Properties; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Test; @@ -52,7 +63,7 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT { @Test public void testGroupByWithLimitOverRowKey() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),TestUtil.PTSDB_NAME,TestUtil.PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),TestUtil.PTSDB_NAME,TestUtil.PTSDB_NAME, null, ts-2, null); Properties props = new Properties(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10)); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -85,7 +96,7 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT { public void testUpsertDateValues() throws Exception { long ts = nextTimestamp(); Date now = new Date(System.currentTimeMillis()); - ensureTableCreated(getUrl(),TestUtil.PTSDB_NAME,TestUtil.PTSDB_NAME,null, ts-2); + ensureTableCreated(getUrl(),TestUtil.PTSDB_NAME,TestUtil.PTSDB_NAME,null, ts-2, null); Properties props = new Properties(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 1)); // Execute at timestamp 1 Connection conn = DriverManager.getConnection(getUrl(), props); @@ -114,7 +125,7 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT { @Test public void testUpsertValuesWithExpression() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),"IntKeyTest","IntKeyTest", null, ts-2); + ensureTableCreated(getUrl(),"IntKeyTest","IntKeyTest", null, ts-2, null); Properties props = new Properties(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 1)); // Execute at timestamp 1 Connection conn = DriverManager.getConnection(getUrl(), props); @@ -847,7 +858,7 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT { assertEquals("KV2", rs.getString(2)); assertFalse(rs.next()); - // Verify now that the data was correctly added to the mutable index too. + // Verify now that the data was correctly added to the immutable index too. stmt = conn.prepareStatement("SELECT KV2 FROM " + tableName + " WHERE PK2 = ? AND KV1 = ?"); stmt.setDate(1, upsertedDate); stmt.setString(2, "KV1"); @@ -960,6 +971,38 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT { } } + public void testColumnQualifierForUpsertedValues() throws Exception { + String schemaName = "A"; + String tableName = "TEST"; + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String ddl = "create table " + fullTableName + + " (" + + " K varchar primary key," + + " CF1.V1 varchar, CF2.V2 VARCHAR, CF2.V3 VARCHAR)"; + try (Connection conn = getConnection(nextTimestamp())) { + conn.createStatement().execute(ddl); + } + String dml = "UPSERT INTO " + fullTableName + " VALUES (?, ?, ?, ?)"; + try (Connection conn = getConnection(nextTimestamp())) { + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setString(1, "KEY1"); + stmt.setString(2, "VALUE1"); + stmt.setString(3, "VALUE2"); + stmt.setString(4, "VALUE3"); + stmt.executeUpdate(); + conn.commit(); + } + // Issue a raw hbase scan and assert that key values have the expected column qualifiers. + try (Connection conn = getConnection(nextTimestamp())) { + HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)); + ResultScanner scanner = table.getScanner(new Scan()); + Result next = scanner.next(); + assertTrue(next.containsColumn(Bytes.toBytes("CF1"), PInteger.INSTANCE.toBytes(1))); + assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(2))); + assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(3))); + } + } + private static Connection getConnection(long ts) throws SQLException { Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java index 8b5a591..0b54e73 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java @@ -141,7 +141,7 @@ public class UserDefinedFunctionsIT extends BaseOwnClusterIT { .append(" throw new ParseException(\"Index cannot be negative :\" + index);\n") .append(" }\n") .append(" Expression arrayExpr = children.get(0);\n") - .append(" return PArrayDataType.positionAtArrayElement(tuple, ptr, index, arrayExpr, getDataType(),getMaxLength());\n") + .append(" return PArrayDataTypeDecoder.positionAtArrayElement(tuple, ptr, index, arrayExpr, getDataType(),getMaxLength());\n") .append(" }\n").toString(); private static String GETY_EVALUATE_METHOD = @@ -217,6 +217,7 @@ public class UserDefinedFunctionsIT extends BaseOwnClusterIT { .append("import org.apache.phoenix.schema.types.PVarchar;\n") .append("import org.apache.phoenix.util.StringUtil;\n") .append("import org.apache.phoenix.schema.types.PArrayDataType;\n") + .append("import org.apache.phoenix.schema.types.PArrayDataTypeDecoder;\n") .append("import org.apache.phoenix.parse.ParseException;\n") .append("public class "+className+" extends ScalarFunction{\n") .append(" public static final String NAME = \""+className+"\";\n") http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java index 6a62673..753f2c8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VariableLengthPKIT.java @@ -58,7 +58,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { } protected static void initGroupByRowKeyColumns(long ts) throws Exception { - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null); // Insert all rows at ts String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; @@ -85,7 +85,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { } protected static void initTableValues(byte[][] splits, long ts) throws Exception { - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, splits, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, splits, ts-2, null); // Insert all rows at ts String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; @@ -106,7 +106,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { stmt.setBigDecimal(4, new BigDecimal(.5)); stmt.execute(); - ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, splits, ts-2); + ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, splits, ts-2, null); conn.setAutoCommit(false); // Insert all rows at ts @@ -431,7 +431,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testNullValueEqualityScan() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null); // Insert all rows at ts String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; @@ -459,7 +459,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testVarLengthPKColScan() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -489,7 +489,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testEscapedQuoteScan() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(), PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(), PTSDB_NAME, PTSDB_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -527,7 +527,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { } private static void initPtsdbTableValues(long ts) throws Exception { - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -560,7 +560,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { } private static void initPtsdbTableValues2(long ts, Date d) throws Exception { - ensureTableCreated(getUrl(),PTSDB2_NAME, PTSDB2_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB2_NAME, PTSDB2_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -696,7 +696,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testBatchUpsert() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),PTSDB2_NAME, PTSDB2_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB2_NAME, PTSDB2_NAME, null, ts-2, null); Date d = new Date(ts); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); @@ -874,7 +874,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testMissingPKColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -894,7 +894,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testNoKVColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2); + ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -914,7 +914,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { // Broken, since we don't know if insert vs update. @Test public void testMissingKVColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2); + ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -942,7 +942,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testTooShortKVColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2); + ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -978,7 +978,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testTooShortPKColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2); + ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -1014,7 +1014,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testTooLongPKColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2); + ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -1051,7 +1051,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testTooLongKVColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2); + ensureTableCreated(getUrl(),BTABLE_NAME, BTABLE_NAME, null, ts-2, null); String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; // Insert at timestamp 0 Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -1481,7 +1481,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testLikeOnColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null); // Insert all rows at ts String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; @@ -1598,7 +1598,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testILikeOnColumn() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null); // Insert all rows at ts String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; @@ -1730,7 +1730,7 @@ public class VariableLengthPKIT extends BaseClientManagedTimeIT { @Test public void testIsNullInPK() throws Exception { long ts = nextTimestamp(); - ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2); + ensureTableCreated(getUrl(),PTSDB_NAME, PTSDB_NAME, null, ts-2, null); // Insert all rows at ts String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropMetadataIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropMetadataIT.java index 4e7d06a..3d0ba8a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropMetadataIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropMetadataIT.java @@ -43,11 +43,13 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.ColumnNotFoundException; +import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -175,6 +177,7 @@ public class DropMetadataIT extends ParallelStatsDisabledIT { // verify that the local index physical table was *not* dropped conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(localIndexTablePhysicalName.getBytes()); + PTable localIndex2 = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, localIndexTableName2)); // there should be a single row belonging to localIndexTableName2 Scan scan = new Scan(); @@ -184,7 +187,7 @@ public class DropMetadataIT extends ParallelStatsDisabledIT { Result result = results.next(); assertNotNull(result); assertNotNull("localIndexTableName2 row is missing", result.getValue(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES, - IndexUtil.getIndexColumnName(QueryConstants.DEFAULT_COLUMN_FAMILY, "V1").getBytes())); + localIndex2.getColumnForColumnName(IndexUtil.getIndexColumnName(QueryConstants.DEFAULT_COLUMN_FAMILY, "V1")).getColumnQualifierBytes())); assertNull(results.next()); } } @@ -295,9 +298,11 @@ public class DropMetadataIT extends ParallelStatsDisabledIT { ResultScanner results = table.getScanner(scan); Result result = results.next(); assertNotNull(result); - // there should be a single row belonging to " + viewIndex2 + " - assertNotNull( viewIndex2 + " row is missing", result.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - IndexUtil.getIndexColumnName(QueryConstants.DEFAULT_COLUMN_FAMILY, "V4").getBytes())); + PTable viewIndexPTable = pconn.getTable(new PTableKey(pconn.getTenantId(), viewIndex2)); + PColumn column = viewIndexPTable.getColumnForColumnName(IndexUtil.getIndexColumnName(QueryConstants.DEFAULT_COLUMN_FAMILY, "V4")); + byte[] cq = column.getColumnQualifierBytes(); + // there should be a single row belonging to VIEWINDEX2 + assertNotNull(viewIndex2 + " row is missing", result.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, cq)); assertNull(results.next()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index bc301fa..06802b6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -52,6 +52,7 @@ import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; @@ -70,6 +71,7 @@ import com.google.common.collect.Maps; public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { private final boolean localIndex; + private final boolean columnEncoded; private final String tableDDLOptions; private volatile boolean stopThreads = false; @@ -78,9 +80,15 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { private static String INDEX_DDL; public static final AtomicInteger NUM_ROWS = new AtomicInteger(0); - public ImmutableIndexIT(boolean localIndex, boolean transactional) { - this.localIndex = localIndex; + public ImmutableIndexIT(boolean localIndex, boolean transactional, boolean columnEncoded) { StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true"); + this.localIndex = localIndex; + this.columnEncoded = columnEncoded; + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0,IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } if (transactional) { optionBuilder.append(", TRANSACTIONAL=true"); } @@ -98,11 +106,13 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } - @Parameters(name="ImmutableIndexIT_localIndex={0},transactional={1}") // name is used by failsafe as file name in reports + @Parameters(name="ImmutableIndexIT_localIndex={0},transactional={1},columnEncoded={2}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, - { true, false }, { true, true } }); + { false, false, false }, { false, false, true }, + { false, true, false }, { false, true, true }, + { true, false, false }, { true, false, true }, + { true, true, false }, { true, true, true } }); } @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java index 383452f..2395b02 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java @@ -410,7 +410,12 @@ public class IndexExpressionIT extends ParallelStatsDisabledIT { rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexTableName); assertTrue(rs.next()); assertEquals(2, rs.getInt(1)); - + + String sql = "SELECT LONG_COL1 from " + fullDataTableName + " WHERE LONG_COL2 = 2"; + rs = conn.createStatement().executeQuery(sql); + assertTrue(rs.next()); + assertFalse(rs.next()); + String dml = "DELETE from " + fullDataTableName + " WHERE long_col2 = 2"; assertEquals(1, conn.createStatement().executeUpdate(dml)); conn.commit(); @@ -861,8 +866,10 @@ public class IndexExpressionIT extends ParallelStatsDisabledIT { conn.setAutoCommit(false); // make sure that the tables are empty, but reachable - conn.createStatement().execute( - "CREATE TABLE " + dataTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + conn.createStatement().execute( + "CREATE TABLE " + dataTableName + + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + + (!mutable ? " IMMUTABLE_ROWS=true" : "")); query = "SELECT * FROM " + dataTableName ; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); @@ -1235,7 +1242,16 @@ public class IndexExpressionIT extends ParallelStatsDisabledIT { } @Test - public void testViewUsesTableIndex() throws Exception { + public void testViewUsesMutableTableIndex() throws Exception { + helpTestViewUsesTableIndex(false); + } + + @Test + public void testViewUsesImmutableTableIndex() throws Exception { + helpTestViewUsesTableIndex(true); + } + + private void helpTestViewUsesTableIndex(boolean immutable) throws Exception { Connection conn = DriverManager.getConnection(getUrl()); try { @@ -1244,7 +1260,7 @@ public class IndexExpressionIT extends ParallelStatsDisabledIT { String viewName = generateUniqueName(); String indexName2 = generateUniqueName(); ResultSet rs; - String ddl = "CREATE TABLE " + dataTableName + " (k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, s1 VARCHAR, s2 VARCHAR, s3 VARCHAR, s4 VARCHAR CONSTRAINT pk PRIMARY KEY (k1, k2))"; + String ddl = "CREATE TABLE " + dataTableName + " (k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, s1 VARCHAR, s2 VARCHAR, s3 VARCHAR, s4 VARCHAR CONSTRAINT pk PRIMARY KEY (k1, k2)) " + (immutable ? "IMMUTABLE_ROWS = true" : ""); conn.createStatement().execute(ddl); conn.createStatement().execute("CREATE INDEX " + indexName1 + " ON " + dataTableName + "(k2, s2, s3, s1)"); conn.createStatement().execute("CREATE INDEX " + indexName2 + " ON " + dataTableName + "(k2, s2||'_'||s3, s1, s4)"); @@ -1341,7 +1357,7 @@ public class IndexExpressionIT extends ParallelStatsDisabledIT { try { conn.createStatement().execute( "CREATE TABLE " + dataTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR) " - + (mutable ? "IMMUTABLE_ROWS=true" : "")); + + (!mutable ? "IMMUTABLE_ROWS=true" : "")); String query = "SELECT * FROM " + dataTableName; ResultSet rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java index 410dca5..b76d61d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java @@ -61,8 +61,8 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.util.DateUtil; @@ -85,27 +85,39 @@ public class IndexIT extends ParallelStatsDisabledIT { private final boolean mutable; private final String tableDDLOptions; - - public IndexIT(boolean localIndex, boolean mutable, boolean transactional) { + public IndexIT(boolean localIndex, boolean mutable, boolean transactional, boolean columnEncoded) { this.localIndex = localIndex; this.transactional = transactional; this.mutable = mutable; StringBuilder optionBuilder = new StringBuilder(); - if (!mutable) - optionBuilder.append(" IMMUTABLE_ROWS=true "); + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } + if (!mutable) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } if (transactional) { - if (!(optionBuilder.length()==0)) + if (optionBuilder.length()!=0) optionBuilder.append(","); optionBuilder.append(" TRANSACTIONAL=true "); } this.tableDDLOptions = optionBuilder.toString(); } - @Parameters(name="IndexIT_localIndex={0},mutable={1},transactional={2}") // name is used by failsafe as file name in reports + @Parameters(name="IndexIT_localIndex={0},mutable={1},transactional={2},columnEncoded={3}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { - { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, - { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true } + { false, false, false, false }, { false, false, false, true }, { false, false, true, false }, { false, false, true, true }, + { false, true, false, false }, { false, true, false, true }, { false, true, true, false }, { false, true, true, true }, + { true, false, false, false }, { true, false, false, true }, { true, false, true, false }, { true, false, true, true }, + { true, true, false, false }, { true, true, false, true }, { true, true, true, false }, { true, true, true, true } }); } @@ -780,7 +792,7 @@ public class IndexIT extends ParallelStatsDisabledIT { conn.createStatement().execute( "CREATE TABLE " + testTable + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " - + (!tableDDLOptions.isEmpty() ? tableDDLOptions : "") + "SPLIT ON ('b')"); + + (!tableDDLOptions.isEmpty() ? tableDDLOptions : "") + " SPLIT ON ('b')"); query = "SELECT * FROM " + testTable; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); @@ -808,23 +820,23 @@ public class IndexIT extends ParallelStatsDisabledIT { stmt.execute(); conn.commit(); - // make sure the index is working as expected - query = "SELECT * FROM " + fullIndexName; + query = "SELECT /*+ NO_INDEX */ * FROM " + testTable; rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); - assertEquals("x", rs.getString(1)); - assertEquals("1", rs.getString(2)); - assertEquals("a", rs.getString(3)); + assertEquals("a", rs.getString(1)); + assertEquals("x", rs.getString(2)); + assertEquals("1", rs.getString(3)); assertTrue(rs.next()); - assertEquals("y", rs.getString(1)); - assertEquals("2", rs.getString(2)); - assertEquals("b", rs.getString(3)); + assertEquals("b", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("2", rs.getString(3)); assertTrue(rs.next()); - assertEquals("z", rs.getString(1)); - assertEquals("3", rs.getString(2)); - assertEquals("c", rs.getString(3)); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertEquals("3", rs.getString(3)); assertFalse(rs.next()); - + + // make sure the index is working as expected query = "SELECT * FROM " + testTable; rs = conn.createStatement().executeQuery("EXPLAIN " + query); if (localIndex) { @@ -897,7 +909,7 @@ public class IndexIT extends ParallelStatsDisabledIT { } else { assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullIndexName + " ['1']", QueryUtil.getExplainPlan(rs)); } - + rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals("a",rs.getString(1)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java index e854f23..fb9776e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; @@ -48,6 +47,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; @@ -115,7 +115,7 @@ public class IndexTestUtil { while ((hasValue = dataRowKeySchema.next(ptr, i, maxOffset)) != null) { if (hasValue) { PColumn dataColumn = dataPKColumns.get(i); - PColumn indexColumn = indexTable.getColumn(IndexUtil.getIndexColumnName(dataColumn)); + PColumn indexColumn = indexTable.getColumnForColumnName(IndexUtil.getIndexColumnName(dataColumn)); coerceDataValueToIndexValue(dataColumn, indexColumn, ptr); indexValues[indexColumn.getPosition()-indexOffset] = ptr.copyBytes(); } @@ -135,10 +135,11 @@ public class IndexTestUtil { for (Cell kv : entry.getValue()) { @SuppressWarnings("deprecation") byte[] cq = kv.getQualifier(); - if (Bytes.compareTo(QueryConstants.EMPTY_COLUMN_BYTES, cq) != 0) { + byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(dataTable).getFirst(); + if (Bytes.compareTo(emptyKVQualifier, cq) != 0) { try { - PColumn dataColumn = family.getColumn(cq); - PColumn indexColumn = indexTable.getColumn(IndexUtil.getIndexColumnName(family.getName().getString(), dataColumn.getName().getString())); + PColumn dataColumn = family.getPColumnForColumnQualifier(cq); + PColumn indexColumn = indexTable.getColumnForColumnName(IndexUtil.getIndexColumnName(family.getName().getString(), dataColumn.getName().getString())); ptr.set(kv.getValueArray(),kv.getValueOffset(),kv.getValueLength()); coerceDataValueToIndexValue(dataColumn, indexColumn, ptr); indexValues[indexPKColumns.indexOf(indexColumn)-indexOffset] = ptr.copyBytes(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index e9205c9..e612f49 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -284,6 +284,8 @@ public class MutableIndexFailureIT extends BaseTest { // verify index table has correct data validateDataWithIndex(conn, fullTableName, fullIndexName); validateDataWithIndex(conn, secondTableName, secondFullIndexName); + } finally { + FAIL_WRITE = false; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index de639e0..56e5bf4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -53,6 +53,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -66,12 +67,17 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { protected final boolean localIndex; private final String tableDDLOptions; - public MutableIndexIT(boolean localIndex, boolean transactional) { + public MutableIndexIT(boolean localIndex, boolean transactional, boolean columnEncoded) { this.localIndex = localIndex; StringBuilder optionBuilder = new StringBuilder(); if (transactional) { optionBuilder.append("TRANSACTIONAL=true"); } + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } this.tableDDLOptions = optionBuilder.toString(); } @@ -86,11 +92,13 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { return getConnection(props); } - @Parameters(name="MutableIndexIT_localIndex={0},transactional={1}") // name is used by failsafe as file name in reports + @Parameters(name="MutableIndexIT_localIndex={0},transactional={1},columnEncoded={2}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, { true, false }, { true, true } - }); + return Arrays.asList(new Boolean[][] { + { false, false, false }, { false, false, true }, + { false, true, false }, { false, true, true }, + { true, false, false }, { true, false, true }, + { true, true, false }, { true, true, true } }); } @Test @@ -612,11 +620,13 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } @Test + @Ignore //TODO remove after PHOENIX-3585 is fixed public void testSplitDuringIndexScan() throws Exception { testSplitDuringIndexScan(false); } @Test + @Ignore //TODO remove after PHOENIX-3585 is fixed public void testSplitDuringIndexReverseScan() throws Exception { testSplitDuringIndexScan(true); } @@ -675,6 +685,7 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } @Test + @Ignore //TODO remove after PHOENIX-3585 is fixed public void testIndexHalfStoreFileReader() throws Exception { Connection conn1 = getConnection(); HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java index 29f3758..5ae11bf 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java @@ -58,7 +58,7 @@ public class SaltedTableIT extends BaseClientManagedTimeIT { // 4abc123jkl444 try { // Upsert with no column specifies. - ensureTableCreated(getUrl(), TABLE_WITH_SALTING, TABLE_WITH_SALTING, splits, ts-2); + ensureTableCreated(getUrl(), TABLE_WITH_SALTING, TABLE_WITH_SALTING, splits, ts-2, null); String query = "UPSERT INTO " + TABLE_WITH_SALTING + " VALUES(?,?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(query); stmt.setInt(1, 1); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java new file mode 100644 index 0000000..badf39b --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.tx; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; +import org.apache.tephra.TxConstants; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; + +@RunWith(Parameterized.class) +public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { + + private final String tableDDLOptions; + + public ParameterizedTransactionIT(boolean mutable, boolean columnEncoded) { + StringBuilder optionBuilder = new StringBuilder("TRANSACTIONAL=true"); + if (!columnEncoded) { + optionBuilder.append(",COLUMN_ENCODED_BYTES=0"); + } + if (!mutable) { + optionBuilder.append(",IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } + this.tableDDLOptions = optionBuilder.toString(); + } + + @Parameters(name="TransactionIT_mutable={0},columnEncoded={1}") // name is used by failsafe as file name in reports + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + {false, false }, {false, true }, {true, false }, { true, true }, + }); + } + + @Test + public void testReadOwnWrites() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + String selectSql = "SELECT * FROM "+ fullTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + TestUtil.setRowKeyColumns(stmt, 2); + stmt.execute(); + + // verify rows can be read even though commit has not been called + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + assertFalse(rs.next()); + + conn.commit(); + + // verify rows can be read after commit + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + assertFalse(rs.next()); + } + } + + @Test + public void testTxnClosedCorrecty() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + String selectSql = "SELECT * FROM "+fullTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + TestUtil.setRowKeyColumns(stmt, 2); + stmt.execute(); + + // verify rows can be read even though commit has not been called + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + // Long currentTx = rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp(); + assertFalse(rs.next()); + + conn.close(); + // start new connection + // conn.createStatement().executeQuery(selectSql); + // assertFalse("This transaction should not be on the invalid transactions", + // txManager.getCurrentState().getInvalid().contains(currentTx)); + } + } + + @Test + public void testAutoCommitQuerySingleTable() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(true); + // verify no rows returned + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName); + assertFalse(rs.next()); + } + } + + @Test + public void testAutoCommitQueryMultiTables() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(true); + // verify no rows returned + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName + " x JOIN " + fullTableName + " y ON (x.long_pk = y.int_pk)"); + assertFalse(rs.next()); + } + } + + @Test + public void testSelfJoin() throws Exception { + String t1 = generateUniqueName(); + String t2 = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + t1 + " (varchar_pk VARCHAR NOT NULL primary key, a.varchar_col1 VARCHAR, b.varchar_col2 VARCHAR)" + tableDDLOptions); + conn.createStatement().execute("create table " + t2 + " (varchar_pk VARCHAR NOT NULL primary key, a.varchar_col1 VARCHAR, b.varchar_col1 VARCHAR)" + tableDDLOptions); + // verify no rows returned + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + t1 + " x JOIN " + t1 + " y ON (x.varchar_pk = y.a.varchar_col1)"); + assertFalse(rs.next()); + rs = conn.createStatement().executeQuery("SELECT * FROM " + t2 + " x JOIN " + t2 + " y ON (x.varchar_pk = y.a.varchar_col1)"); + assertFalse(rs.next()); + } + } + + private void testRowConflicts(String fullTableName) throws Exception { + try (Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl())) { + conn1.setAutoCommit(false); + conn2.setAutoCommit(false); + String selectSql = "SELECT * FROM "+fullTableName; + conn1.setAutoCommit(false); + ResultSet rs = conn1.createStatement().executeQuery(selectSql); + boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullTableName)).isImmutableRows(); + assertFalse(rs.next()); + // upsert row using conn1 + String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 10); + stmt.execute(); + // upsert row using conn2 + upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)"; + stmt = conn2.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 11); + stmt.execute(); + + conn1.commit(); + //second commit should fail + try { + conn2.commit(); + if (!immutableRows) fail(); + } + catch (SQLException e) { + if (immutableRows) fail(); + assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); + } + } + } + + @Test + public void testRowConflictDetected() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + testRowConflicts(fullTableName); + } + + @Test + public void testNoConflictDetectionForImmutableRows() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET IMMUTABLE_ROWS=true"); + testRowConflicts(fullTableName); + } + + @Test + public void testNonTxToTxTable() throws Exception { + String nonTxTableName = generateUniqueName(); + + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions); + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (1)"); + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (2, 'a')"); + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (3, 'b')"); + conn.commit(); + + String index = generateUniqueName(); + conn.createStatement().execute("CREATE INDEX " + index + " ON " + nonTxTableName + "(v)"); + // Reset empty column value to an empty value like it is pre-transactions + HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); + List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3))); + for (Put put : puts) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); + } + htable.put(puts); + + conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true"); + + htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(index)); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (4, 'c')"); + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL"); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, nonTxTableName)).isTransactional()); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertFalse(rs.next()); + conn.commit(); + + conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (5, 'd')"); + rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, index)).isTransactional()); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(5,rs.getInt(1)); + assertFalse(rs.next()); + conn.rollback(); + + rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3,rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(4,rs.getInt(1)); + assertFalse(rs.next()); + } + + @Ignore + @Test + public void testNonTxToTxTableFailure() throws Exception { + String nonTxTableName = generateUniqueName(); + + Connection conn = DriverManager.getConnection(getUrl()); + // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG + conn.createStatement().execute("CREATE TABLE SYSTEM." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions); + conn.createStatement().execute("UPSERT INTO SYSTEM." + nonTxTableName + " VALUES (1)"); + conn.commit(); + // Reset empty column value to an empty value like it is pre-transactions + HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); + Put put = new Put(PInteger.INSTANCE.toBytes(1)); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); + htable.put(put); + + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + try { + // This will succeed initially in updating the HBase metadata, but then will fail when + // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore + // the coprocessors back to the non transactional ones. + conn.createStatement().execute("ALTER TABLE SYSTEM." + nonTxTableName + " SET TRANSACTIONAL=true"); + fail(); + } catch (SQLException e) { + assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled")); + } finally { + admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + admin.close(); + } + + ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM." + nonTxTableName + " WHERE v IS NULL"); + assertTrue(rs.next()); + assertEquals(1,rs.getInt(1)); + assertFalse(rs.next()); + + htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); + assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices(). + getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)). + getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions()); + } + + @Test + public void testCreateTableToBeTransactional() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String t1 = generateUniqueName(); + String t2 = generateUniqueName(); + String ddl = "CREATE TABLE " + t1 + " (k varchar primary key) " + tableDDLOptions; + conn.createStatement().execute(ddl); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + PTable table = pconn.getTable(new PTableKey(null, t1)); + HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); + assertTrue(table.isTransactional()); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + + try { + ddl = "ALTER TABLE " + t1 + " SET transactional=false"; + conn.createStatement().execute(ddl); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); + } + + HBaseAdmin admin = pconn.getQueryServices().getAdmin(); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(t2)); + desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)); + admin.createTable(desc); + ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; + conn.createStatement().execute(ddl); + assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA)); + + // Should be ok, as HBase metadata should match existing metadata. + ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; + try { + conn.createStatement().execute(ddl); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); + } + ddl += " transactional=true"; + conn.createStatement().execute(ddl); + table = pconn.getTable(new PTableKey(null, t1)); + htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); + assertTrue(table.isTransactional()); + assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + } + + @Test + public void testCurrentDate() throws Exception { + String transTableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; + String selectSql = "SELECT current_date() FROM "+fullTableName; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions); + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + conn.commit(); + + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + Date date1 = rs.getDate(1); + assertFalse(rs.next()); + + Thread.sleep(1000); + + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + Date date2 = rs.getDate(1); + assertFalse(rs.next()); + assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime()); + } + } + + + @Test + public void testParallelUpsertSelect() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); + props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + String fullTableName1 = generateUniqueName(); + String fullTableName2 = generateUniqueName(); + String sequenceName = "S_" + generateUniqueName(); + conn.createStatement().execute("CREATE SEQUENCE " + sequenceName); + conn.createStatement().execute("CREATE TABLE " + fullTableName1 + " (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4" + + (!tableDDLOptions.isEmpty()? "," : "") + tableDDLOptions); + conn.createStatement().execute("CREATE TABLE " + fullTableName2 + " (pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions); + + for (int i = 0; i < 100; i++) { + conn.createStatement().execute("UPSERT INTO " + fullTableName1 + " VALUES (NEXT VALUE FOR " + sequenceName + ", " + (i%10) + ")"); + } + conn.commit(); + conn.setAutoCommit(true); + int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName2 + " SELECT pk, val FROM " + fullTableName1); + assertEquals(100,upsertCount); + conn.close(); + } + + @Test + public void testInflightPartialEval() throws SQLException { + + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions); + + + try (Connection conn1 = DriverManager.getConnection(getUrl()); Connection conn2 = DriverManager.getConnection(getUrl())) { + conn1.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','b','x')"); + // Select to force uncommitted data to be written + ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertFalse(rs.next()); + + conn2.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','c','x')"); + // Select to force uncommitted data to be written + rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName ); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("c", rs.getString(2)); + assertFalse(rs.next()); + + // If the AndExpression were to see the uncommitted row from conn2, the filter would + // filter the row out early and no longer continue to evaluate other cells due to + // the way partial evaluation holds state. + rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'c' AND v2 = 'x'"); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertFalse(rs.next()); + + // Same as above for conn1 data + rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'b' AND v2 = 'x'"); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("c", rs.getString(2)); + assertFalse(rs.next()); + } + + } + } + +}