This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 4941aacbad PHOENIX-7646 New PhoenixStatement API to return old row
state in Atomic Updates (#2199)
4941aacbad is described below
commit 4941aacbadb5b49e1f51b26c20a840e25dc87431
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Jun 24 16:08:42 2025 -0700
PHOENIX-7646 New PhoenixStatement API to return old row state in Atomic
Updates (#2199)
---
.../org/apache/phoenix/compile/DeleteCompiler.java | 5 +-
.../org/apache/phoenix/execute/MutationState.java | 18 +-
.../phoenix/index/PhoenixIndexBuilderHelper.java | 1 +
.../phoenix/jdbc/PhoenixPreparedStatement.java | 26 ++-
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 37 +++-
.../phoenix/hbase/index/IndexRegionObserver.java | 51 ++++--
.../apache/phoenix/index/PhoenixIndexBuilder.java | 6 +-
.../java/org/apache/phoenix/end2end/Bson4IT.java | 200 ++++++++++++++++++++-
.../apache/phoenix/end2end/OnDuplicateKey2IT.java | 159 ++++++++++++++++
9 files changed, 479 insertions(+), 24 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index aa506da9db..272bbec3a2 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -621,8 +621,9 @@ public class DeleteCompiler {
final StatementContext context = dataPlan.getContext();
Scan scan = context.getScan();
scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_AGG,
QueryConstants.TRUE);
- if (context.getScanRanges().getPointLookupCount() == 1 &&
- returnResult == MutationState.ReturnResult.ROW) {
+ if (context.getScanRanges().getPointLookupCount() == 1
+ && (returnResult ==
MutationState.ReturnResult.NEW_ROW_ON_SUCCESS
+ || returnResult ==
MutationState.ReturnResult.OLD_ROW_ALWAYS)) {
scan.setAttribute(BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE,
QueryConstants.TRUE);
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 26503508de..b92d699867 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -775,11 +775,16 @@ public class MutationState implements SQLCloseable {
}
}
if (this.returnResult != null) {
- if (this.returnResult == ReturnResult.ROW) {
+ if (this.returnResult == ReturnResult.NEW_ROW_ON_SUCCESS) {
for (Mutation mutation : rowMutations) {
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
PhoenixIndexBuilderHelper.RETURN_RESULT_ROW);
}
+ } else if (this.returnResult ==
ReturnResult.OLD_ROW_ALWAYS) {
+ for (Mutation mutation : rowMutations) {
+
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
+
PhoenixIndexBuilderHelper.RETURN_RESULT_OLD_ROW);
+ }
}
}
// The DeleteCompiler already generates the deletes for
indexes, so no need to do it again
@@ -802,9 +807,12 @@ public class MutationState implements SQLCloseable {
mutation.setAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB,
onDupKeyBytes);
}
if (this.returnResult != null) {
- if (this.returnResult == ReturnResult.ROW) {
+ if (this.returnResult ==
ReturnResult.NEW_ROW_ON_SUCCESS) {
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
PhoenixIndexBuilderHelper.RETURN_RESULT_ROW);
+ } else if (this.returnResult ==
ReturnResult.OLD_ROW_ALWAYS) {
+
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
+
PhoenixIndexBuilderHelper.RETURN_RESULT_OLD_ROW);
}
}
}
@@ -1519,7 +1527,8 @@ public class MutationState implements SQLCloseable {
.decodeInt(cell.getValueArray(),
cell.getValueOffset(),
SortOrder.getDefault());
if (this.returnResult != null) {
- if (this.returnResult == ReturnResult.ROW)
{
+ if (this.returnResult ==
ReturnResult.NEW_ROW_ON_SUCCESS ||
+ this.returnResult ==
ReturnResult.OLD_ROW_ALWAYS) {
this.result = result;
}
}
@@ -2393,7 +2402,8 @@ public class MutationState implements SQLCloseable {
}
public enum ReturnResult {
- ROW
+ NEW_ROW_ON_SUCCESS,
+ OLD_ROW_ALWAYS
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
index 6ef1d9e38a..9b078abc63 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
@@ -41,6 +41,7 @@ public final class PhoenixIndexBuilderHelper {
public static final String RETURN_RESULT = "_RETURN_RESULT";
public static final byte[] RETURN_RESULT_ROW = new byte[]{0};
+ public static final byte[] RETURN_RESULT_OLD_ROW = new byte[]{1};
public static byte[] serializeOnDupKeyIgnore() {
return ON_DUP_KEY_IGNORE_BYTES;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index ecc961f0d2..4905a14153 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -36,7 +36,6 @@ import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLXML;
-import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.ZoneOffset;
@@ -215,6 +214,28 @@ public class PhoenixPreparedStatement extends
PhoenixStatement implements Phoeni
}
}
+ /**
+ * Executes the given SQL statement similar to JDBC API executeUpdate()
but also returns the
+ * old row (before update) as Result object back to the client. This must
be used with
+ * auto-commit Connection. This makes the operation atomic.
+ * Return the old row (state before update) regardless of whether the
update is
+ * successful or not.
+ *
+ * @return The pair of int and ResultSet, where int represents value 1 for
successful row update
+ * and 0 for non-successful row update, and ResultSet represents the old
state of the row.
+ * @throws SQLException If the statement cannot be executed.
+ */
+ // Note: Do Not remove this, it is expected to be used by downstream
applications
+ public Pair<Integer, ResultSet> executeAtomicUpdateReturnOldRow() throws
SQLException {
+ if (!connection.getAutoCommit()) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
+ .buildException();
+ }
+ preExecuteUpdate();
+ return executeMutation(statement, createAuditQueryLogger(statement,
query),
+ MutationState.ReturnResult.OLD_ROW_ALWAYS);
+ }
+
/**
* Executes the given SQL statement similar to JDBC API executeUpdate()
but also returns the
* updated or non-updated row as Result object back to the client. This
must be used with
@@ -226,6 +247,7 @@ public class PhoenixPreparedStatement extends
PhoenixStatement implements Phoeni
* and 0 for non-successful row update, and ResultSet represents the state
of the row.
* @throws SQLException If the statement cannot be executed.
*/
+ // Note: Do Not remove this, it is expected to be used by downstream
applications
public Pair<Integer, ResultSet> executeAtomicUpdateReturnRow() throws
SQLException {
if (!connection.getAutoCommit()) {
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
@@ -233,7 +255,7 @@ public class PhoenixPreparedStatement extends
PhoenixStatement implements Phoeni
}
preExecuteUpdate();
return executeMutation(statement, createAuditQueryLogger(statement,
query),
- MutationState.ReturnResult.ROW);
+ MutationState.ReturnResult.NEW_ROW_ON_SUCCESS);
}
public QueryPlan optimizeQuery() throws SQLException {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 0dea1cef21..fa4cd22f5f 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -583,7 +583,7 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
protected int executeMutation(final CompilableStatement stmt,
final AuditQueryLogger queryLogger) throws
SQLException {
return executeMutation(stmt, true, queryLogger,
- isResultSetExpected(stmt) ? ReturnResult.ROW :
null).getFirst();
+ isResultSetExpected(stmt) ? ReturnResult.NEW_ROW_ON_SUCCESS :
null).getFirst();
}
Pair<Integer, ResultSet> executeMutation(final CompilableStatement stmt,
@@ -626,7 +626,8 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
isUpsert = stmt instanceof
ExecutableUpsertStatement;
isDelete = stmt instanceof
ExecutableDeleteStatement;
if (isDelete && connection.getAutoCommit() &&
- returnResult == ReturnResult.ROW) {
+ (returnResult ==
ReturnResult.NEW_ROW_ON_SUCCESS ||
+ returnResult ==
ReturnResult.OLD_ROW_ALWAYS)) {
// used only if single row deletion needs
to atomically
// return row that is deleted.
plan = ((ExecutableDeleteStatement)
stmt).compilePlan(
@@ -2489,6 +2490,7 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
* update and 0 for non-successful row update, and ResultSet represents
the state of the row.
* @throws SQLException If the statement cannot be executed.
*/
+ // Note: Do Not remove this, it is expected to be used by downstream
applications
public Pair<Integer, ResultSet> executeAtomicUpdateReturnRow(String sql)
throws SQLException {
if (!connection.getAutoCommit()) {
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
@@ -2496,7 +2498,36 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
}
CompilableStatement stmt = preExecuteUpdate(sql);
Pair<Integer, ResultSet> result =
- executeMutation(stmt, createAuditQueryLogger(stmt, sql),
ReturnResult.ROW);
+ executeMutation(stmt, createAuditQueryLogger(stmt, sql),
+ ReturnResult.NEW_ROW_ON_SUCCESS);
+ flushIfNecessary();
+ return result;
+ }
+
+ /**
+ * Executes the given SQL statement similar to JDBC API executeUpdate()
but also returns the
+ * old row (before update) as Result object back to the client. This must
be used with
+ * auto-commit Connection. This makes the operation atomic.
+ * Return the old row (state before update) regardless of whether the
update is
+ * successful or not.
+ *
+ * @param sql The SQL DML statement, UPSERT or DELETE for Phoenix.
+ * @return The pair of int and ResultSet, where int represents value 1 for
successful row
+ * update and 0 for non-successful row update, and ResultSet represents
the old state of the
+ * row.
+ * @throws SQLException If the statement cannot be executed.
+ */
+ // Note: Do Not remove this, it is expected to be used by downstream
applications
+ public Pair<Integer, ResultSet> executeAtomicUpdateReturnOldRow(String sql)
+ throws SQLException {
+ if (!connection.getAutoCommit()) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
+ .buildException();
+ }
+ CompilableStatement stmt = preExecuteUpdate(sql);
+ Pair<Integer, ResultSet> result =
+ executeMutation(stmt, createAuditQueryLogger(stmt, sql),
+ ReturnResult.OLD_ROW_ALWAYS);
flushIfNecessary();
return result;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index fed0bb274d..ab1c7ad95e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -278,6 +278,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// true, will be null if there is no expression on this column,
otherwise false
// This is only initialized for single row atomic mutation.
private Map<ColumnReference, Pair<Cell, Boolean>>
currColumnCellExprMap;
+ // store old row cells into a map for OLD_ROW return result. This
preserves the original
+ // state of the row before any conditional updates are applied.
+ private Map<ColumnReference, Pair<Cell, Boolean>>
oldRowColumnCellExprMap;
// list containing the original mutations from the
MiniBatchOperationInProgress. Contains
// any annotations we were sent by the client, and can be used in
hooks that don't get
// passed MiniBatchOperationInProgress, like preWALAppend()
@@ -290,6 +293,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private boolean hasLocalIndex;
private boolean hasTransform;
private boolean returnResult;
+ private boolean returnOldRow;
private boolean hasConditionalTTL; // table has Conditional TTL
public BatchMutateContext() {
@@ -1316,6 +1320,11 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
Mutation m = miniBatchOp.getOperation(i);
if (this.builder.returnResult(m) && miniBatchOp.size() == 1) {
context.returnResult = true;
+ byte[] returnResult =
m.getAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT);
+ if (returnResult != null && Arrays.equals(returnResult,
+ PhoenixIndexBuilderHelper.RETURN_RESULT_OLD_ROW)) {
+ context.returnOldRow = true;
+ }
}
if (this.builder.hasConditionalTTL(m)) {
context.hasConditionalTTL = true;
@@ -1656,7 +1665,13 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
List<Cell> cells = new ArrayList<>();
cells.add(cell);
- addCellsIfResultReturned(miniBatchOp, context, cells);
+ if (!context.returnOldRow) {
+ addCellsIfResultReturned(miniBatchOp,
context.returnResult, cells,
+ context.currColumnCellExprMap, false);
+ } else {
+ addCellsIfResultReturned(miniBatchOp,
context.returnResult, cells,
+ context.oldRowColumnCellExprMap, true);
+ }
Result result = Result.create(cells);
miniBatchOp.setOperationStatus(0,
@@ -1682,28 +1697,32 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
/**
- * If the result needs to be returned for the given update operation,
identify the updated row
- * cells and add the input list of cells.
+ * If the result needs to be returned for the given update operation,
identify the appropriate
+ * row cells and add them to the input list of cells. The method can
return either the updated
+ * row cells (for ROW return type) or the original row cells (for OLD_ROW
return type).
*
* @param miniBatchOp Batch of mutations getting applied to region.
- * @param context The BatchMutateContext object shared during coproc hooks
execution as part of
- * the batch mutate life cycle.
+ * @param returnResult Whether the result should be returned to the client.
* @param cells The list of cells to be returned back to the client.
+ * @param currColumnCellExprMap The map containing column reference to
cell mappings. This
+ * can be either the current/updated state (for ROW) or the original state
(for OLD_ROW)
+ * depending on the return type requested.
*/
private static void
addCellsIfResultReturned(MiniBatchOperationInProgress<Mutation> miniBatchOp,
- BatchMutateContext context,
List<Cell> cells) {
- if (context.returnResult) {
- Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap =
- context.currColumnCellExprMap;
+ boolean returnResult,
List<Cell> cells,
+ Map<ColumnReference,
Pair<Cell, Boolean>>
+ currColumnCellExprMap,
+ boolean retainOldRow) {
+ if (returnResult) {
if (currColumnCellExprMap == null) {
return;
}
Mutation mutation = miniBatchOp.getOperation(0);
- if (mutation instanceof Put) {
+ if (mutation instanceof Put && !retainOldRow) {
updateColumnCellExprMap(mutation, currColumnCellExprMap);
}
Mutation[] mutations =
miniBatchOp.getOperationsFromCoprocessors(0);
- if (mutations != null) {
+ if (mutations != null && !retainOldRow) {
for (Mutation m : mutations) {
updateColumnCellExprMap(m, currColumnCellExprMap);
}
@@ -1892,6 +1911,13 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
Put currentDataRowState = dataRowState != null ? dataRowState.getFirst()
: null;
+ // Create separate map for old row data when OLD_ROW is requested
+ // This must be done before any conditional update logic to preserve
original state
+ if (context.returnResult && context.returnOldRow &&
currentDataRowState != null) {
+ context.oldRowColumnCellExprMap = new HashMap<>();
+ updateCurrColumnCellExpr(currentDataRowState,
context.oldRowColumnCellExprMap);
+ }
+
// if result needs to be returned but the DML does not have ON
DUPLICATE KEY present,
// perform the mutation and return the result.
if (opBytes == null) {
@@ -2074,6 +2100,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private static void updateCurrColumnCellExpr(Put put,
Map<ColumnReference,
Pair<Cell, Boolean>>
currColumnCellExprMap) {
+ if (put == null) {
+ return;
+ }
for (Map.Entry<byte[], List<Cell>> entry :
put.getFamilyCellMap().entrySet()) {
for (Cell cell : entry.getValue()) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 473afbec2c..7d1219c57e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -22,6 +22,7 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -278,7 +279,10 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder
{
@Override
public boolean returnResult(Mutation m) {
- return m.getAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT) != null;
+ byte[] returnResult =
m.getAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT);
+ return returnResult != null && (Arrays.equals(returnResult,
+ PhoenixIndexBuilderHelper.RETURN_RESULT_ROW) ||
Arrays.equals(returnResult,
+ PhoenixIndexBuilderHelper.RETURN_RESULT_OLD_ROW));
}
@Override
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
index 94f4fd1007..d6bfee7db2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
@@ -34,6 +34,7 @@ import java.util.Properties;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.schema.types.PDouble;
import org.bson.BsonArray;
import org.bson.BsonBinary;
@@ -51,7 +52,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
-import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.util.PropertiesUtil;
@@ -59,6 +59,7 @@ import static
org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -674,6 +675,185 @@ public class Bson4IT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testConditionalUpsertReturnOldRow() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String ddl = "CREATE TABLE " + tableName
+ + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
+ + " CONSTRAINT pk PRIMARY KEY(PK1))";
+ conn.createStatement().execute(ddl);
+
+ String sample1 = getJsonString("json/sample_01.json");
+ String sample2 = getJsonString("json/sample_02.json");
+ String sample3 = getJsonString("json/sample_03.json");
+ BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+ BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+ BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
+
+ upsertRows(conn, tableName, bsonDocument1, bsonDocument2, bsonDocument3);
+ PreparedStatement stmt;
+
+ String conditionExpression =
+ "press = :press AND track[0].shot[2][0].city.standard[50] =
:softly";
+
+ BsonDocument conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append(":press", new BsonString("beat"))
+ .append(":softly", new BsonString("softly")));
+
+ String query = "SELECT * FROM " + tableName +
+ " WHERE PK1 = 'pk0001' AND C1 = '0002' AND NOT
BSON_CONDITION_EXPRESSION(COL, '"
+ + conditionDoc.toJson() + "')";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+
+ assertTrue(rs.next());
+ assertEquals("pk0001", rs.getString(1));
+ assertEquals("0002", rs.getString(2));
+ BsonDocument document1 = (BsonDocument) rs.getObject(3);
+ assertEquals(bsonDocument1, document1);
+
+ assertFalse(rs.next());
+
+ conditionExpression =
+ "press = :press AND track[0].shot[2][0].city.standard[5] =
:softly";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append(":press", new BsonString("beat"))
+ .append(":softly", new BsonString("softly")));
+
+ query = "SELECT * FROM " + tableName +
+ " WHERE PK1 = 'pk0001' AND C1 = '0002' AND
BSON_CONDITION_EXPRESSION(COL, '"
+ + conditionDoc.toJson() + "')";
+ rs = conn.createStatement().executeQuery(query);
+
+ assertTrue(rs.next());
+ assertEquals("pk0001", rs.getString(1));
+ assertEquals("0002", rs.getString(2));
+ document1 = (BsonDocument) rs.getObject(3);
+ assertEquals(bsonDocument1, document1);
+
+ assertFalse(rs.next());
+
+ BsonDocument updateExp = new BsonDocument()
+ .append("$SET", new BsonDocument()
+ .append("browserling",
+ new
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+ .append("track[0].shot[2][0].city.standard[5]", new
BsonString("soft"))
+ .append("track[0].shot[2][0].city.problem[2]",
+ new
BsonString("track[0].shot[2][0].city.problem[2] + 529.435")))
+ .append("$UNSET", new BsonDocument()
+ .append("track[0].shot[2][0].city.flame", new
BsonNull()));
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END,"
+ + " C1 = ?");
+ stmt.setString(1, "pk0001");
+ stmt.setString(2, "0003");
+
+ assertReturnedOldRowResult(stmt, conn, tableName, "json/sample_01.json",
true);
+
+ updateExp = new BsonDocument()
+ .append("$ADD", new BsonDocument()
+ .append("new_samples",
+ new BsonDocument().append("$set",
+ new BsonArray(Arrays.asList(
+ new
BsonBinary(Bytes.toBytes("Sample10")),
+ new
BsonBinary(Bytes.toBytes("Sample12")),
+ new
BsonBinary(Bytes.toBytes("Sample13")),
+ new
BsonBinary(Bytes.toBytes("Sample14"))
+ )))))
+ .append("$DELETE_FROM_SET", new BsonDocument()
+ .append("new_samples",
+ new BsonDocument().append("$set",
+ new BsonArray(Arrays.asList(
+ new
BsonBinary(Bytes.toBytes("Sample02")),
+ new
BsonBinary(Bytes.toBytes("Sample03"))
+ )))))
+ .append("$SET", new BsonDocument()
+ .append("newrecord", ((BsonArray)
(document1.get("track"))).get(0)))
+ .append("$UNSET", new BsonDocument()
+ .append("rather[3].outline.halfway.so[2][2]", new
BsonNull()));
+
+ conditionExpression =
+ "field_not_exists(newrecord) AND
field_exists(rather[3].outline.halfway.so[2][2])";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument());
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END");
+
+ stmt.setString(1, "pk1010");
+
+ assertReturnedOldRowResult(stmt, conn, tableName, "json/sample_02.json",
true);
+
+ updateExp = new BsonDocument()
+ .append("$SET", new BsonDocument()
+ .append("result[1].location.state", new
BsonString("AK")))
+ .append("$UNSET", new BsonDocument()
+ .append("result[4].emails[1]", new BsonNull()));
+
+ conditionExpression =
+ "result[2].location.coordinates.latitude > :latitude OR "
+ + "(field_exists(result[1].location) AND
result[1].location.state != :state" +
+ " AND field_exists(result[4].emails[1]))";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append(":latitude", new BsonDouble(0))
+ .append(":state", new BsonString("AK")));
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END");
+
+ stmt.setString(1, "pk1011");
+
+ assertReturnedOldRowResult(stmt, conn, tableName, "json/sample_03.json",
true);
+
+ conditionExpression =
+ "press = :press AND track[0].shot[2][0].city.standard[5] =
:softly";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append(":press", new BsonString("incorrect_value"))
+ .append(":softly", new BsonString("incorrect_value")));
+
+ updateExp = new BsonDocument()
+ .append("$SET", new BsonDocument()
+ .append("new_field1",
+ new
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+ .append("track[0].shot[2][0].city.standard[5]", new
BsonString(
+ "soft_new_val"))
+ .append("track[0].shot[2][0].city.problem[2]",
+ new
BsonString("track[0].shot[2][0].city.problem[2] + 123")));
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END");
+ stmt.setString(1, "pk0001");
+
+ assertReturnedOldRowResult(stmt, conn, tableName,
"json/sample_updated_01.json", false);
+
+ verifyRows(tableName, conn);
+ }
+ }
+
@Test
public void testBsonPk() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -861,6 +1041,24 @@ public class Bson4IT extends ParallelStatsDisabledIT {
assertEquals(RawBsonDocument.parse(getJsonString(jsonPath)),
resultSet.getObject(3));
}
+ private static void assertReturnedOldRowResult(PreparedStatement stmt,
+ Connection conn,
+ String tableName,
+ String jsonPath,
+ boolean success)
+ throws SQLException, IOException {
+ Pair<Integer, ResultSet> resultPair =
+
stmt.unwrap(PhoenixPreparedStatement.class).executeAtomicUpdateReturnOldRow();
+ assertEquals(success ? 1 : 0, (int) resultPair.getFirst());
+ ResultSet resultSet = resultPair.getSecond();
+ if (success) {
+ assertEquals(RawBsonDocument.parse(getJsonString(jsonPath)),
resultSet.getObject(3));
+ assertFalse(resultSet.next());
+ } else {
+ assertEquals(RawBsonDocument.parse(getJsonString(jsonPath)),
resultSet.getObject(3));
+ }
+ }
+
private static void validateExplainPlan(PreparedStatement ps, String
tableName, String scanType)
throws SQLException {
ExplainPlan plan =
ps.unwrap(PhoenixPreparedStatement.class).optimizeQuery().getExplainPlan();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
index d29fe6b4b9..de07c7a955 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
@@ -507,6 +508,164 @@ public class OnDuplicateKey2IT extends
ParallelStatsDisabledIT {
}
}
+ /**
+ * Validates that the returned row contains the original state before the
upsert operation.
+ * This method uses executeAtomicUpdateReturnOldRow() to test the OLD_ROW
functionality.
+ */
+ private static void validateReturnedRowBeforeUpsert(Connection conn,
+ String upsertSql,
+ String tableName,
+ Double col1,
+ String col2,
+ boolean success,
+ BsonDocument inputDoc,
+ BsonDocument
expectedDoc,
+ Integer col4)
+ throws SQLException {
+ int updateCount;
+ ResultSet resultSet;
+ if (inputDoc != null) {
+ PreparedStatement ps = conn.prepareStatement(upsertSql);
+ ps.setObject(1, inputDoc);
+ Pair<Integer, ResultSet> resultPair =
+
ps.unwrap(PhoenixPreparedStatement.class).executeAtomicUpdateReturnOldRow();
+ updateCount = resultPair.getFirst();
+ resultSet = resultPair.getSecond();
+ } else {
+ Statement stmt = conn.createStatement();
+ Pair<Integer, ResultSet> resultPair =
+
stmt.unwrap(PhoenixStatement.class).executeAtomicUpdateReturnOldRow(upsertSql);
+ updateCount = resultPair.getFirst();
+ resultSet = resultPair.getSecond();
+ }
+ boolean isOnDuplicateKey = upsertSql.toUpperCase().contains("ON
DUPLICATE KEY");
+ if (conn.getAutoCommit() && isOnDuplicateKey) {
+ assertEquals(success ? 1 : 0, updateCount);
+ if (resultSet != null) {
+ assertEquals("pk000", resultSet.getString(1));
+ assertEquals(-123.98, resultSet.getDouble(2), 0.0);
+ assertEquals("pk003", resultSet.getString(3));
+ assertEquals(col1, resultSet.getDouble(4), 0.0);
+ validateReturnedRowResult(col2, expectedDoc, col4, resultSet);
+ assertFalse(resultSet.next());
+ }
+ }
+ }
+
+ @Test
+ public void testReturnOldRowResult1() throws Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+ // NOTE - Tuple result projection does not work well with local index
because
+ // this assertion can be false: CellUtil.matchingRows(kvs[0],
kvs[kvs.length-1])
+ // as the Tuple contains different rowkeys.
+ Assume.assumeTrue("ResultSet return does not work with local index",
+ !indexDDL.startsWith("create local index"));
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String sample1 = getJsonString("json/sample_01.json");
+ String sample2 = getJsonString("json/sample_02.json");
+ BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+ BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String tableName = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName
+ + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR,
COUNTER1 DOUBLE,"
+ + " COUNTER2 VARCHAR,"
+ + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY
KEY(PK1, PK2, PK3))";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ validateAtomicUpsertReturnOldRow(tableName, conn, bsonDocument1,
bsonDocument2);
+
+ verifyIndexRow(conn, tableName, false);
+ }
+ }
+
+ private static void validateAtomicUpsertReturnOldRow(String tableName,
Connection conn,
+ BsonDocument
bsonDocument1,
+ BsonDocument
bsonDocument2)
+ throws SQLException {
+ String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3,
COUNTER1, COL3, COL4)"
+ + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON
DUPLICATE KEY " +
+ "IGNORE";
+ validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 0.0, null,
true,
+ bsonDocument1, null, null);
+
+ upsertSql =
+ "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) "
+ + "VALUES('pk000', -123.98, 'pk003', 0) ON DUPLICATE
KEY IGNORE";
+ validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 1011.202,
null, false,
+ null, bsonDocument1, 123);
+
+ upsertSql =
+ "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3, COUNTER1, COUNTER2)
VALUES('pk000', -123.98, "
+ + "'pk003', 234, 'col2_000')";
+ validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 1011.202,
null, true,
+ null, bsonDocument1, 123);
+
+ upsertSql = "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON
DUPLICATE KEY UPDATE "
+ + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 +
1999.99 ELSE COUNTER1"
+ + " END, "
+ + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001'
ELSE COUNTER2 "
+ + "END, "
+ + "COL3 = ?, "
+ + "COL4 = 234";
+ validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 234d,
"col2_000", true,
+ bsonDocument2, bsonDocument1, 123);
+
+ upsertSql = "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON
DUPLICATE KEY UPDATE "
+ + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 +
1999.99 ELSE COUNTER1"
+ + " END,"
+ + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001'
ELSE COUNTER2 "
+ + "END";
+ validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 2233.99,
"col2_001", false
+ , null, bsonDocument2, 234);
+ }
+
+ @Test
+ public void testReturnOldRowResult2() throws Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+ // NOTE - Tuple result projection does not work well with local index
because
+ // this assertion can be false: CellUtil.matchingRows(kvs[0],
kvs[kvs.length-1])
+ // as the Tuple contains different rowkeys.
+ Assume.assumeTrue("ResultSet return does not work with local index",
+ !indexDDL.startsWith("create local index"));
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String sample1 = getJsonString("json/sample_01.json");
+ String sample2 = getJsonString("json/sample_02.json");
+ BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+ BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String tableName = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName
+ + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR,
COUNTER1 DOUBLE,"
+ + " COUNTER2 VARCHAR,"
+ + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY
KEY(PK1, PK2, PK3))";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3,
COUNTER1, COL3, COL4)"
+ + " VALUES('pk000', -123.98, 'pk003', 999.999, ?, 999) ON
DUPLICATE KEY " +
+ "IGNORE";
+ validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 0.0,
null, true,
+ bsonDocument1, null, null);
+
+ upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3,
COUNTER1, COL3, COL4)"
+ + " VALUES('pk000', -123.98, 'pk003', 888.888, ?, 888) ON
DUPLICATE KEY " +
+ "IGNORE";
+ validateReturnedRowBeforeUpsert(conn, upsertSql, tableName,
999.999, null, false,
+ bsonDocument2, bsonDocument1, 999);
+ }
+ }
+
@Test
public void testReturnRowResultForMultiPointLookup() throws Exception {
Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +