This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 762d708906 PHOENIX-7543 Wrong result returned when query is served by
index and some columns are null (#2097)
762d708906 is described below
commit 762d708906ac460ba063c3739c40b432c45af7bd
Author: tkhurana <[email protected]>
AuthorDate: Fri Mar 28 10:27:37 2025 -0700
PHOENIX-7543 Wrong result returned when query is served by index and some
columns are null (#2097)
Add DeleteColumn cells to index rows when a column is set to null in an
upsert statement.
---
.../org/apache/phoenix/index/IndexMaintainer.java | 46 +++++
.../java/org/apache/phoenix/util/IndexUtil.java | 35 ++++
.../coprocessor/GlobalIndexRegionScanner.java | 59 +++---
.../phoenix/hbase/index/IndexRegionObserver.java | 47 +++--
.../apache/phoenix/index/GlobalIndexChecker.java | 41 -----
.../org/apache/phoenix/end2end/IndexToolIT.java | 12 ++
.../end2end/index/GlobalIndexCheckerIT.java | 198 ++++++++++++++++++++-
.../apache/phoenix/index/IndexMaintainerTest.java | 141 ++++++++++++---
8 files changed, 462 insertions(+), 117 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 24645b29a9..9b920af2a8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
@@ -73,6 +74,7 @@ import
org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
import org.apache.phoenix.hbase.index.AbstractValueGetter;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -1462,6 +1464,50 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
return put;
}
+ /**
+ * For mutable covered indexes, an index update is a full update. However,
if some included
+ * columns are set to null in the upsert statement we need to write a
DeleteColumn cell
+ * to such columns.
+ * @param indexUpdate Put mutation updating index which includes at a
minimum the empty column
+ * @param ts The update timestamp
+ * @return Delete mutation with DeleteColumn cells for all covered columns
that are missing
+ * in the indexUpdate Put mutation
+ * @throws IOException
+ */
+ public Delete buildDeleteColumnMutation(Put indexUpdate, long ts) throws
IOException {
+ if (getIndexStorageScheme() ==
ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ // for single cell storage format no need to build a delete column
mutation
+ return null;
+ }
+ if (coveredColumnsMap == null || coveredColumnsMap.isEmpty()) {
+ // no covered columns in the index no need to build a delete
mutation
+ return null;
+ }
+ int colsSet = indexUpdate.getFamilyCellMap()
+ .values().stream().mapToInt(elem -> elem.size()).sum();
+ if (coveredColumnsMap.size() + 1 == colsSet) { // add 1 for the empty
column
+ // Index row update is always a full update except when some
columns are explicitly
+ // set to null. Do a quick size check to determine if some covered
columns are being
+ // set to null or not.
+ return null;
+ }
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexUpdate.getRow());
+ Delete delete = new Delete(rowKey.get());
+ for (Entry<ColumnReference, ColumnReference> coveredCol :
coveredColumnsMap.entrySet()) {
+ ColumnReference indexCol = coveredCol.getValue();
+ if (!indexUpdate.has(indexCol.getFamily(),
indexCol.getQualifier())) {
+ KeyValue kv =
GenericKeyValueBuilder.INSTANCE.buildDeleteColumns(
+ rowKey,
+ indexCol.getFamilyWritable(),
+ indexCol.getQualifierWritable(),
+ ts);
+ delete.add(kv);
+ }
+ }
+ assert !delete.isEmpty();
+ return delete;
+ }
+
public enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
private DeleteType getDeleteTypeOrNull(Collection<? extends Cell>
pendingUpdates) {
return getDeleteTypeOrNull(pendingUpdates, this.nDataCFs);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 8aa2c33e92..01762600ab 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.util;
+import static org.apache.hadoop.hbase.Cell.Type.DeleteColumn;
+import static org.apache.hadoop.hbase.Cell.Type.DeleteFamily;
import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.LOCAL_INDEX_BUILD;
import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.LOCAL_INDEX_BUILD_PROTO;
import static
org.apache.phoenix.coprocessorclient.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -905,6 +908,38 @@ public class IndexUtil {
return columns;
}
+ public static boolean isDeleteFamily(Mutation mutation) {
+ if (!(mutation instanceof Delete)) {
+ return false;
+ }
+ for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (cell.getType() == DeleteFamily) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static boolean isDeleteColumn(Mutation mutation) {
+ if (!(mutation instanceof Delete)) {
+ return false;
+ }
+ for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (cell.getType() == DeleteColumn) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static boolean isDeleteFamilyOrDeleteColumn(Mutation mutation) {
+ return isDeleteFamily(mutation) || isDeleteColumn(mutation);
+ }
+
public static class SimpleValueGetter implements ValueGetter {
final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
final Put put;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 52ca74c1d5..6e10e4b093 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -93,6 +93,8 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import static org.apache.hadoop.hbase.Cell.Type.DeleteColumn;
+import static org.apache.hadoop.hbase.Cell.Type.DeleteFamily;
import static
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
import static
org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID;
import static
org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING;
@@ -613,17 +615,6 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
}
};
- private boolean isDeleteFamily(Mutation mutation) {
- for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
- for (Cell cell : cells) {
- if (cell.getType() == Cell.Type.DeleteFamily) {
- return true;
- }
- }
- }
- return false;
- }
-
private void updateUnverifiedIndexRowCounters(Put actual, long expectedTs,
List<Mutation> indexRowsToBeDeleted,
IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
// Get the empty column of the given index row
@@ -738,7 +729,8 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
while (iterator.hasNext()) {
Mutation mutation = iterator.next();
if ((mutation instanceof Put && !isVerified((Put) mutation)) ||
- (mutation instanceof Delete && !isDeleteFamily(mutation)))
{
+ (mutation instanceof Delete
+ &&
!IndexUtil.isDeleteFamilyOrDeleteColumn(mutation))) {
iterator.remove();
} else {
if (((previous instanceof Put && mutation instanceof Put) ||
@@ -787,6 +779,11 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
* index rebuilds, the delete family markers are used to delete index rows
due to data table row deletes or
* data table row overwrites.
*
+ * Delete Column Markers
+ * Delete column markers are generated during read repair, regular table
updates and
+ * index rebuilds. The delete column markers are used for any included
column in the index
+ * which is set to null.
+ *
* Verification Algorithm
*
* IndexTool verification generates an expected list of index mutations
from the data table rows and uses this list
@@ -797,7 +794,9 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
*
* Every mutation will include a set of cells with the same timestamp
* Every mutation has a different timestamp
- * A delete mutation will include only delete family cells and it is for
deleting the entire row and its versions
+ * A delete mutation can either include delete family cells and it is for
deleting the entire
+ * row and its versions or delete column cells. The delete column cells
are added for those
+ * included columns in the index which are set to null.
* Every put mutation is verified
*
* For both verification types, after the expected list of index mutations
is constructed for a given data table,
@@ -807,7 +806,8 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
* As in the construction for the expected list, the cells are grouped
into a put and a delete set. The put and
* delete sets for a given row are further grouped based on their
timestamps into put and delete mutations such that
* all the cells in a mutation have the timestamps. The put and delete
mutations are then sorted within a single
- * list. Mutations in this list are sorted in ascending order of their
timestamp. This list is the actual list.
+ * list. Mutations in this list are sorted in descending order of their
timestamp.
+ * This list is the actual list.
*
* For the without-repair verification, unverified mutations and family
version delete markers are removed from
* the actual list and then the list is compared with the expected list.
@@ -872,7 +872,8 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
// Between an expected delete and put, there can be one or
more deletes due to
// concurrent mutations or data table write failures. Skip
all of them if any
// There cannot be any actual delete mutation between two
expected put mutations.
- while (getTimestamp(actual) >= getTimestamp(expected) &&
actual instanceof Delete) {
+ while (getTimestamp(actual) >= getTimestamp(expected)
+ && IndexUtil.isDeleteFamily(actual)) {
actualIndex++;
if (actualIndex == actualSize) {
break;
@@ -892,9 +893,13 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
continue;
}
} else { // expected instanceof Delete
- // Between put and delete, delete and delete, or before the
first delete, there can be other deletes.
- // Skip all of them if any
- while (getTimestamp(actual) > getTimestamp(expected) && actual
instanceof Delete) {
+ // Between put and delete, delete and delete, or before the
first delete, there can
+ // be other deletes. Skip all of them if any. This can happen
when there are
+ // unverified index rows on a deleted row and read-repair will
put a DeleteFamily
+ // marker. Those delete family markers will be visible until
compaction runs on the
+ // index table.
+ while (getTimestamp(actual) > getTimestamp(expected)
+ && IndexUtil.isDeleteFamily(actual)) {
actualIndex++;
if (actualIndex == actualSize) {
break;
@@ -904,8 +909,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
if (actualIndex == actualSize) {
break;
}
- if (getTimestamp(actual) == getTimestamp(expected) &&
- (actual instanceof Delete && isDeleteFamily(actual))) {
+ if (isMatchingMutation(expected, actual)) {
expectedIndex++;
actualIndex++;
continue;
@@ -1179,7 +1183,8 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
};
public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
- // Reorder the mutations on the same row so that delete comes before
put when they have the same timestamp
+ // Reorder the mutations on the same row so that put comes before
delete when they
+ // have the same timestamp
return getMutationsWithSameTS(put, del, MUTATION_TS_COMPARATOR);
}
@@ -1277,7 +1282,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
* uncovered partial indexes.
* pendingMutations is a sorted list of data table mutations that are used
to replay index
* table mutations. This list is sorted in ascending order by the tuple of
row key, timestamp
- * and mutation type where delete comes after put.
+ * and mutation type where put comes before delete.
*/
public static List<Mutation>
prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
Put dataPut, Delete dataDel, byte[] encodedRegionName) throws
IOException {
@@ -1343,6 +1348,10 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
Put indexPut = prepareIndexPutForRebuild(indexMaintainer,
rowKeyPtr,
nextDataRowVG, ts, encodedRegionName);
indexMutations.add(indexPut);
+ Delete deleteColumn =
indexMaintainer.buildDeleteColumnMutation(indexPut, ts);
+ if (deleteColumn != null) {
+ indexMutations.add(deleteColumn);
+ }
// Delete the current index row if the new index key is
different than the current one
if (indexRowKeyForCurrentDataRow != null) {
if (Bytes.compareTo(indexPut.getRow(),
indexRowKeyForCurrentDataRow) != 0) {
@@ -1401,6 +1410,10 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
Put indexPut = prepareIndexPutForRebuild(indexMaintainer,
rowKeyPtr,
nextDataRowVG, ts, encodedRegionName);
indexMutations.add(indexPut);
+ Delete deleteColumn =
indexMaintainer.buildDeleteColumnMutation(indexPut, ts);
+ if (deleteColumn != null) {
+ indexMutations.add(deleteColumn);
+ }
// Delete the current index row if the new index key is
different than the current one
if (indexRowKeyForCurrentDataRow != null) {
if (Bytes.compareTo(indexPut.getRow(),
indexRowKeyForCurrentDataRow) != 0) {
@@ -1439,7 +1452,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
List<Mutation> mutationList = indexMutationMap.get(indexRowKey);
if (mutationList == null) {
if (!mostRecentDone) {
- if (mutation instanceof Put) {
+ if (mutation instanceof Put ||
IndexUtil.isDeleteColumn(mutation)) {
mostRecentIndexRowKeys.add(indexRowKey);
mostRecentDone = true;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 72ef706566..95fbfd0ce6 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -207,21 +207,26 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return lastContext;
}
}
-
- private static boolean ignoreIndexRebuildForTesting = false;
- private static boolean failPreIndexUpdatesForTesting = false;
- private static boolean failPostIndexUpdatesForTesting = false;
- private static boolean failDataTableUpdatesForTesting = false;
-
- public static void setIgnoreIndexRebuildForTesting(boolean ignore) {
ignoreIndexRebuildForTesting = ignore; }
-
- public static void setFailPreIndexUpdatesForTesting(boolean fail) {
failPreIndexUpdatesForTesting = fail; }
-
- public static void setFailPostIndexUpdatesForTesting(boolean fail) {
failPostIndexUpdatesForTesting = fail; }
-
- public static void setFailDataTableUpdatesForTesting(boolean fail) {
- failDataTableUpdatesForTesting = fail;
- }
+ private static boolean ignoreIndexRebuildForTesting = false;
+ private static boolean failPreIndexUpdatesForTesting = false;
+ private static boolean failPostIndexUpdatesForTesting = false;
+ private static boolean failDataTableUpdatesForTesting = false;
+ private static boolean ignoreWritingDeleteColumnsToIndex = false;
+ public static void setIgnoreIndexRebuildForTesting(boolean ignore) {
+ ignoreIndexRebuildForTesting = ignore;
+ }
+ public static void setFailPreIndexUpdatesForTesting(boolean fail) {
+ failPreIndexUpdatesForTesting = fail;
+ }
+ public static void setFailPostIndexUpdatesForTesting(boolean fail) {
+ failPostIndexUpdatesForTesting = fail;
+ }
+ public static void setFailDataTableUpdatesForTesting(boolean fail) {
+ failDataTableUpdatesForTesting = fail;
+ }
+ public static void setIgnoreWritingDeleteColumnsToIndex(boolean ignore) {
+ ignoreWritingDeleteColumnsToIndex = ignore;
+ }
public enum BatchMutatePhase {
PRE, POST, FAILED
@@ -1035,6 +1040,14 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
QueryConstants.UNVERIFIED_BYTES);
context.indexUpdates.put(hTableInterfaceReference,
new Pair<Mutation, byte[]>(indexPut,
rowKeyPtr.get()));
+ if (!ignoreWritingDeleteColumnsToIndex) {
+ Delete deleteColumn =
+
indexMaintainer.buildDeleteColumnMutation(indexPut, ts);
+ if (deleteColumn != null) {
+ context.indexUpdates.put(hTableInterfaceReference,
+ new Pair<Mutation, byte[]>(deleteColumn,
rowKeyPtr.get()));
+ }
+ }
// Delete the current index row if the new index key is
different from the
// current one and the index is not a CDC index
if (currentDataRowState != null) {
@@ -1113,7 +1126,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (m instanceof Put) {
// This will be done before the data table row is
updated (i.e., in the first write phase)
context.preIndexUpdates.put(hTableInterfaceReference,
m);
- } else {
+ } else if (IndexUtil.isDeleteFamily(m)) {
+ // DeleteColumn is always accompanied by a Put so no
need to make the index
+ // row unverified again. Only do this for DeleteFamily
// Set the status of the index row to "unverified"
Put unverifiedPut = new Put(m.getRow());
unverifiedPut.addColumn(
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 62b8677103..c2d97dbb97 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -513,51 +513,10 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
emptyCQ, 0, emptyCQ.length) == 0;
}
- /**
- * An index row is composed of cells with the same timestamp.
However, if there are multiple versions of an
- * index row, HBase can return an index row with cells from multiple
versions, and thus it can return cells
- * with different timestamps. This happens if the version of the row
we are reading does not have a value
- * (i.e., effectively has null value) for a column whereas an older
version has a value for the column.
- * In this case, we need to remove the older cells for correctness.
- */
- private void removeOlderCells(List<Cell> cellList) {
- Iterator<Cell> cellIterator = cellList.iterator();
- if (!cellIterator.hasNext()) {
- return;
- }
- Cell cell = cellIterator.next();
- long maxTs = cell.getTimestamp();
- long ts;
- boolean allTheSame = true;
- while (cellIterator.hasNext()) {
- cell = cellIterator.next();
- ts = cell.getTimestamp();
- if (ts != maxTs) {
- if (ts > maxTs) {
- maxTs = ts;
- }
- allTheSame = false;
- }
- }
- if (allTheSame) {
- return;
- }
- cellIterator = cellList.iterator();
- while (cellIterator.hasNext()) {
- cell = cellIterator.next();
- if (cell.getTimestamp() != maxTs) {
- cellIterator.remove();
- }
- }
- }
-
private boolean verifyRowAndRemoveEmptyColumn(List<Cell> cellList)
throws IOException {
if (indexMaintainer.isUncovered()) {
return true;
}
- if (!indexMaintainer.isImmutableRows()) {
- removeOlderCells(cellList);
- }
long cellListSize = cellList.size();
Cell cell = null;
if (cellListSize == 0) {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 3291457164..7214ac6991 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.compile.ExplainPlan;
@@ -838,6 +839,17 @@ public class IndexToolIT extends BaseTest {
return
indexTool.getJob().getCounters().getGroup(PhoenixIndexToolJobCounters.class.getName());
}
+ public static void dumpMRJobCounters(IndexTool indexTool) throws
IOException {
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+ dumpMRJobCounters(mrJobCounters);
+ }
+
+ public static void dumpMRJobCounters(CounterGroup mrJobCounters) {
+ for (Counter cntr : mrJobCounters) {
+ LOGGER.info(String.format("%s=%d", cntr.getName(),
cntr.getValue()));
+ }
+ }
+
private static List<String> getArgList (boolean useSnapshot, String
schemaName,
String dataTable, String
indxTable, String tenantId,
IndexTool.IndexVerifyType
verifyType, Long startTime,
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 878c25cd91..98359795db 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -32,11 +32,13 @@ import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCA
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
@@ -47,6 +49,8 @@ import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.end2end.IndexToolIT;
@@ -657,18 +661,36 @@ public class GlobalIndexCheckerIT extends BaseTest {
String indexTableName = generateUniqueName();
conn.createStatement().execute("CREATE INDEX " + indexTableName +
" on " +
dataTableName + " (val1) include (val2, val3)" +
this.indexDDLOptions);
- conn.createStatement().execute("upsert into " + dataTableName + "
(id, val1, val2) values ('a', 'ab', 'abcc')");
+ // For immutable tables updating columns to null is ignored
+ conn.createStatement().execute(
+ "upsert into " + dataTableName + " (id, val1, val2, val3)
" +
+ "values ('a', 'ab', 'abcc', null)");
conn.commit();
String selectSql = "SELECT * from " + dataTableName + " WHERE val1
= 'ab'";
// Verify that we will read from the index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
- ResultSet rs = conn.createStatement().executeQuery(selectSql);
- assertTrue(rs.next());
- assertEquals("a", rs.getString(1));
- assertEquals("ab", rs.getString(2));
- assertEquals("abcc", rs.getString(3));
- assertEquals("abcd", rs.getString(4));
- assertFalse(rs.next());
+ try (ResultSet rs =
conn.createStatement().executeQuery(selectSql)) {
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("ab", rs.getString(2));
+ assertEquals("abcc", rs.getString(3));
+ assertEquals("abcd", rs.getString(4));
+ assertFalse(rs.next());
+ }
+
+ // now read the same row from data table
+ selectSql = "SELECT * from " + dataTableName + " WHERE id = 'a'";
+ try (ResultSet rs =
conn.createStatement().executeQuery(selectSql)){
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(dataTableName));
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("ab", rs.getString(2));
+ assertEquals("abcc", rs.getString(3));
+ assertEquals("abcd", rs.getString(4));
+ assertFalse(rs.next());
+ }
}
}
@@ -1223,6 +1245,166 @@ public class GlobalIndexCheckerIT extends BaseTest {
}
}
+ @Test
+ public void testIndexRowWithNullIncludedColumnAndFilter() throws Exception
{
+ if (async) {
+ // No need to run the same test twice one for async = true and the
other for async = false
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd',
'bcde')
+ populateTable(dataTableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " on
" +
+ dataTableName + " (val1) include (val2, val3)" +
indexDDLOptions);
+ conn.commit();
+ // update row ('a', 'ab', 'abc', 'abcd') -> ('a', 'ab', 'abc',
null)
+ conn.createStatement().execute(
+ "upsert into " + dataTableName + " (id, val3) values ('a',
null)");
+ conn.commit();
+
+ String dql = String.format(
+ "select id, val2 from %s where val1='ab' and val3='abcd'",
dataTableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexName));
+ assertFalse(rs.next());
+ }
+
+ dql = String.format(
+ "select id, val2 from %s where val1='ab' and val3 is
null", dataTableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexName));
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("val2"));
+ }
+
+ // update row ('a', 'ab', 'abc', null) -> ('a', 'ac', null, null)
+ conn.createStatement().execute(
+ "upsert into " + dataTableName + " values ('a', 'ac',
null, null)");
+ conn.commit();
+
+ dql = String.format(
+ "select id, val2 from %s where val1='ac' and val3 is
null", dataTableName);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexName));
+ assertTrue(rs.next());
+ assertNull(rs.getString("val2"));
+ }
+ TestUtil.dumpTable(conn, TableName.valueOf(dataTableName));
+ TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+ }
+ }
+
+ @Test
+ public void testIndexToolWithNullIncludedColumn() throws Exception {
+ if (async) {
+ // No need to run the same test twice one for async = true and the
other for async = false
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd',
'bcde')
+ populateTable(dataTableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " on
" +
+ dataTableName + " (val1) include (val2, val3)" +
indexDDLOptions);
+ conn.commit();
+ IndexRegionObserver.setIgnoreWritingDeleteColumnsToIndex(true);
+ // update row ('a', 'ab', 'abc', 'abcd') -> ('a', 'ab', 'abc',
null)
+ conn.createStatement().execute(
+ "upsert into " + dataTableName + " (id, val3) values ('a',
null)");
+ conn.commit();
+ // insert a new partial row
+ conn.createStatement().execute("upsert into " + dataTableName +
+ " (id, val1, val2) values ('c', 'cd', 'cde')");
+ conn.commit();
+ IndexRegionObserver.setIgnoreWritingDeleteColumnsToIndex(false);
+ IndexTool it = IndexToolIT.runIndexTool(false, null,
dataTableName, indexName, null,
+ 0, IndexTool.IndexVerifyType.BEFORE);
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it);
+ IndexToolIT.dumpMRJobCounters(mrJobCounters);
+ try {
+ // single cell index doesn't have an issue with null values
+ assertEquals(encoded ? 0: 2, mrJobCounters.findCounter(
+
BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(encoded ? 0 : 2, mrJobCounters.findCounter(
+ REBUILT_INDEX_ROW_COUNT.name()).getValue());
+ } catch (AssertionError e) {
+ TestUtil.dumpTable(conn, TableName.valueOf(dataTableName));
+ TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+ throw e;
+ }
+ }
+ }
+
+ @Test
+ public void testIndexToolWithMultipleDeleteFamilyMarkers() throws
Exception {
+ if (async) {
+ // No need to run the same test twice one for async = true and the
other for async = false
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd',
'bcde')
+ populateTable(dataTableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " on
" +
+ dataTableName + " (val1) include (val2, val3)" +
indexDDLOptions);
+ conn.commit();
+ String delete = String.format("DELETE FROM %s where id = 'a'",
dataTableName);
+ conn.createStatement().execute(delete);
+ conn.commit();
+ // skip phase2, inserts an unverified row in index
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ String dml = "upsert into " + dataTableName +
+ " (id, val1, val3) values ('a', 'ab', ?)";
+ try (PreparedStatement ps = conn.prepareStatement(dml)) {
+ for (int i = 0; i < 5; ++i) {
+ ps.setString(1, "val3_ " + i);
+ ps.executeUpdate();
+ commitWithException(conn);
+ // trigger a read repair of the unverified row
+ // since the data table row has been deleted the read
repair will insert a
+ // delete family marker on the unverified index row
+ String dql = String.format(
+ "select id, val2, val3 from %s where val1='ab'",
dataTableName);
+ try (ResultSet rs =
conn.createStatement().executeQuery(dql)) {
+ PhoenixResultSet prs =
rs.unwrap(PhoenixResultSet.class);
+ String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexName));
+ assertFalse(rs.next());
+ }
+ }
+ }
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ // update row ('a', 'ab', 'abc', 'abcd') -> ('a', 'ab', 'abc',
null)
+ conn.createStatement().execute(
+ "upsert into " + dataTableName + " (id, val1, val3) values
('a', 'ab', null)");
+ conn.commit();
+ IndexTool it = IndexToolIT.runIndexTool(false, null,
dataTableName, indexName, null,
+ 0, IndexTool.IndexVerifyType.ONLY);
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it);
+ IndexToolIT.dumpMRJobCounters(mrJobCounters);
+ try {
+ assertEquals(2, mrJobCounters.findCounter(
+
BEFORE_REBUILD_VALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0, mrJobCounters.findCounter(
+ REBUILT_INDEX_ROW_COUNT.name()).getValue());
+ } catch (AssertionError e) {
+ TestUtil.dumpTable(conn, TableName.valueOf(dataTableName));
+ TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+ throw e;
+ }
+ }
+ }
+
@Test
public void testViewIndexRowUpdate() throws Exception {
if (async) {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 1a6d1dada4..0379568c74 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.index;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
@@ -37,6 +39,7 @@ import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -50,8 +53,11 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -63,7 +69,7 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
public class IndexMaintainerTest extends BaseConnectionlessQueryTest {
private static final String DEFAULT_SCHEMA_NAME = "";
private static final String DEFAULT_TABLE_NAME = "rkTest";
-
+
private void testIndexRowKeyBuilding(String dataColumns, String pk, String
indexColumns, Object[] values) throws Exception {
testIndexRowKeyBuilding(DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME,
dataColumns, pk, indexColumns, values, "", "", "");
}
@@ -88,10 +94,10 @@ public class IndexMaintainerTest extends
BaseConnectionlessQueryTest {
public byte[] getRowKey() {
return row;
}
-
+
};
}
-
+
private void testIndexRowKeyBuilding(String schemaName, String tableName,
String dataColumns, String pk, String indexColumns, Object[] values, String
includeColumns, String dataProps, String indexProps) throws Exception {
KeyValueBuilder builder = GenericKeyValueBuilder.INSTANCE;
testIndexRowKeyBuilding(schemaName, tableName, dataColumns, pk,
indexColumns, values, includeColumns, dataProps, indexProps, builder);
@@ -114,7 +120,7 @@ public class IndexMaintainerTest extends
BaseConnectionlessQueryTest {
List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr,
builder, true);
assertEquals(1,c1.size());
IndexMaintainer im1 = c1.get(0);
-
+
StringBuilder buf = new StringBuilder("UPSERT INTO " +
fullTableName + " VALUES(");
for (int i = 0; i < values.length; i++) {
buf.append("?,");
@@ -136,7 +142,7 @@ public class IndexMaintainerTest extends
BaseConnectionlessQueryTest {
dataMutation.add(kv);
}
ValueGetter valueGetter = newValueGetter(row, valueMap);
-
+
List<Mutation> indexMutations =
IndexTestUtil.generateIndexData(index, table, dataMutation, ptr, builder);
assertEquals(1,indexMutations.size());
assertTrue(indexMutations.get(0) instanceof Put);
@@ -165,19 +171,19 @@ public class IndexMaintainerTest extends
BaseConnectionlessQueryTest {
public void testRowKeyVarOnlyIndex() throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 DECIMAL", "k1,k2", "k2, k1",
new Object [] {"a",1.1});
}
-
+
@Test
public void testVarFixedndex() throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR",
"k1,k2", "k2, k1", new Object [] {"a",1.1});
}
-
-
+
+
@Test
public void testCompositeRowKeyVarFixedIndex() throws Exception {
// TODO: using 1.1 for INTEGER didn't give error
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR",
"k1,k2", "k2, k1", new Object [] {"a",1});
}
-
+
@Test
public void testCompositeRowKeyVarFixedAtEndIndex() throws Exception {
// Forces trailing zero in index key for fixed length
@@ -185,32 +191,32 @@ public class IndexMaintainerTest extends
BaseConnectionlessQueryTest {
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, k3
VARCHAR, v VARCHAR", "k1,k2,k3", "k1, k3, k2", new Object [] {"a",i, "b"});
}
}
-
+
@Test
public void testSingleKeyValueIndex() throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER, v VARCHAR", "k1",
"v", new Object [] {"a",1,"b"});
}
-
+
@Test
public void testMultiKeyValueIndex() throws Exception {
testIndexRowKeyBuilding("k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1
DECIMAL, v2 CHAR(2), v3 BIGINT", "k1, k2", "v2, k2, v1", new Object []
{"a",1,2.2,"bb"});
}
-
+
@Test
public void testMultiKeyValueCoveredIndex() throws Exception {
testIndexRowKeyBuilding("k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1
DECIMAL, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v2, k2, v1", new
Object [] {"a",1,2.2,"bb"}, "v3, v4");
}
-
+
@Test
public void testSingleKeyValueDescIndex() throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER, v VARCHAR", "k1", "v
DESC", new Object [] {"a",1,"b"});
}
-
+
@Test
public void testCompositeRowKeyVarFixedDescIndex() throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR",
"k1,k2", "k2 DESC, k1", new Object [] {"a",1});
}
-
+
@Test
public void testCompositeRowKeyTimeIndex() throws Exception {
long timeInMillis = System.currentTimeMillis();
@@ -219,7 +225,7 @@ public class IndexMaintainerTest extends
BaseConnectionlessQueryTest {
ts.setNanos((int) (timeInNanos % 1000000000));
testIndexRowKeyBuilding("ts1 DATE NOT NULL, ts2 TIME NOT NULL, ts3
TIMESTAMP NOT NULL", "ts1,ts2,ts3", "ts2, ts1", new Object [] {new
Date(timeInMillis), new Time(timeInMillis), ts});
}
-
+
@Test
public void testCompositeRowKeyBytesIndex() throws Exception {
long timeInMillis = System.currentTimeMillis();
@@ -228,79 +234,79 @@ public class IndexMaintainerTest extends
BaseConnectionlessQueryTest {
ts.setNanos((int) (timeInNanos % 1000000000));
testIndexRowKeyBuilding("b1 BINARY(3) NOT NULL, v VARCHAR", "b1,v",
"v, b1", new Object [] {new byte[] {41,42,43}, "foo"});
}
-
+
@Test
public void testCompositeDescRowKeyVarFixedDescIndex() throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR",
"k1, k2 DESC", "k2 DESC, k1", new Object [] {"a",1});
}
-
+
@Test
public void testCompositeDescRowKeyVarDescIndex() throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 DECIMAL NOT NULL, v VARCHAR",
"k1, k2 DESC", "k2 DESC, k1", new Object [] {"a",1.1,"b"});
}
-
+
@Test
public void testCompositeDescRowKeyVarAscIndex() throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 DECIMAL NOT NULL, v VARCHAR",
"k1, k2 DESC", "k2, k1", new Object [] {"a",1.1,"b"});
}
-
+
@Test
public void testCompositeDescRowKeyVarFixedDescSaltedIndex() throws
Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR",
"k1, k2 DESC", "k2 DESC, k1", new Object [] {"a",1}, "", "", "SALT_BUCKETS=4");
}
-
+
@Test
public void testCompositeDescRowKeyVarFixedDescSaltedIndexSaltedTable()
throws Exception {
testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR",
"k1, k2 DESC", "k2 DESC, k1", new Object [] {"a",1}, "", "SALT_BUCKETS=3",
"SALT_BUCKETS=3");
}
-
+
@Test
public void testMultiKeyValueCoveredSaltedIndex() throws Exception {
testIndexRowKeyBuilding("k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1
DECIMAL, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v2 DESC, k2 DESC, v1",
new Object [] {"a",1,2.2,"bb"}, "v3, v4", "", "SALT_BUCKETS=4");
}
-
+
@Test
public void tesIndexWithBigInt() throws Exception {
testIndexRowKeyBuilding(
"k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BIGINT, v2
CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2",
"v1 DESC, k2 DESC", new Object[] { "a", 1, 2.2, "bb" });
}
-
+
@Test
public void tesIndexWithAscBoolean() throws Exception {
testIndexRowKeyBuilding(
"k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2
CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2",
"v1, k2 DESC", new Object[] { "a", 1, true, "bb" });
}
-
+
@Test
public void tesIndexWithAscNullBoolean() throws Exception {
testIndexRowKeyBuilding(
"k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2
CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2",
"v1, k2 DESC", new Object[] { "a", 1, null, "bb" });
}
-
+
@Test
public void tesIndexWithAscFalseBoolean() throws Exception {
testIndexRowKeyBuilding(
"k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2
CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2",
"v1, k2 DESC", new Object[] { "a", 1, false, "bb" });
}
-
+
@Test
public void tesIndexWithDescBoolean() throws Exception {
testIndexRowKeyBuilding(
"k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2
CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2",
"v1 DESC, k2 DESC", new Object[] { "a", 1, true, "bb" });
}
-
+
@Test
public void tesIndexWithDescFalseBoolean() throws Exception {
testIndexRowKeyBuilding(
"k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2
CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2",
"v1 DESC, k2 DESC", new Object[] { "a", 1, false, "bb" });
}
-
+
@Test
public void tesIndexedExpressionSerialization() throws Exception {
Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
@@ -322,4 +328,81 @@ public class IndexMaintainerTest extends
BaseConnectionlessQueryTest {
conn.close();
}
}
+
+ @Test
+ public void testDeleteColumnMutation() throws Exception {
+ String tableName = "T_" + generateUniqueName();
+ String indexName1 = "I_" + generateUniqueName();
+ String indexName2 = "I_" + generateUniqueName();
+ String ddl = String.format("create table %s (id varchar primary key, "
+
+ "col1 varchar, col2 varchar, col3 bigint)", tableName);
+ String index1 = String.format("create index %s on %s (col2) include
(col1) ",
+ indexName1, tableName);
+ String index2 = String.format("create index %s on %s (col2) include
(col1) " +
+ "COLUMN_ENCODED_BYTES=2,
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS",
+ indexName2, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(ddl);
+ conn.createStatement().execute(index1);
+ conn.createStatement().execute(index2);
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ PTable table = pconn.getTable(tableName);
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ table.getIndexMaintainers(ptr, pconn);
+ List<IndexMaintainer> ims = IndexMaintainer.deserialize(ptr,
+ GenericKeyValueBuilder.INSTANCE, true);
+ assertEquals(2, ims.size());
+ String dml = String.format("upsert into %s values ('a', 'ab',
'abc', 2)", tableName);
+ assertDeleteColumnMutation(tableName, dml, false, pconn, ims);
+ pconn.getMutationState().rollback();
+ dml = String.format("upsert into %s (id, col2) values ('a',
'ab')", tableName);
+ assertDeleteColumnMutation(tableName, dml, true, pconn, ims);
+ pconn.getMutationState().rollback();
+ }
+ }
+
+ private static void assertDeleteColumnMutation(String tableName,
+ String dml,
+ boolean isPartialUpdate,
+ PhoenixConnection pconn,
+ List<IndexMaintainer> ims)
throws Exception {
+ pconn.createStatement().execute(dml);
+ Iterator<Pair<byte[], List<Mutation>>> iterator =
+ pconn.getMutationState().toMutations();
+ while (iterator.hasNext()) {
+ Pair<byte[], List<Mutation>> mutationPair = iterator.next();
+ List<Mutation> batchMutations = mutationPair.getSecond();
+ assertEquals(1, batchMutations.size());
+ assertTrue(batchMutations.get(0) instanceof Put);
+ Put dataRow = (Put) batchMutations.get(0);
+ ValueGetter nextDataRowVG = new
IndexUtil.SimpleValueGetter(dataRow);
+ long ts = EnvironmentEdgeManager.currentTimeMillis();
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(dataRow.getRow());
+ for (IndexMaintainer im : ims) {
+ Put indexPut =
im.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ nextDataRowVG, rowKey, ts, null, null, false, null);
+ if (indexPut == null) {
+ // No covered column. Just prepare an index row with the
empty column
+ byte[] indexRowKey = im.buildRowKey(nextDataRowVG, rowKey,
+ null, null, ts, null);
+ indexPut = new Put(indexRowKey);
+ }
+ indexPut.addColumn(
+ im.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ im.getEmptyKeyValueQualifier(), ts,
+ QueryConstants.UNVERIFIED_BYTES);
+ Delete deleteCol = im.buildDeleteColumnMutation(indexPut, ts);
+ if (im.getIndexStorageScheme() ==
+
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ assertNull(deleteCol);
+ } else {
+ if (isPartialUpdate) {
+ assertNotNull(deleteCol);
+ } else {
+ assertNull(deleteCol);
+ }
+ }
+ }
+ }
+ }
}