This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4941aacbad PHOENIX-7646 New PhoenixStatement API to return old row 
state in Atomic Updates (#2199)
4941aacbad is described below

commit 4941aacbadb5b49e1f51b26c20a840e25dc87431
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Jun 24 16:08:42 2025 -0700

    PHOENIX-7646 New PhoenixStatement API to return old row state in Atomic 
Updates (#2199)
---
 .../org/apache/phoenix/compile/DeleteCompiler.java |   5 +-
 .../org/apache/phoenix/execute/MutationState.java  |  18 +-
 .../phoenix/index/PhoenixIndexBuilderHelper.java   |   1 +
 .../phoenix/jdbc/PhoenixPreparedStatement.java     |  26 ++-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  37 +++-
 .../phoenix/hbase/index/IndexRegionObserver.java   |  51 ++++--
 .../apache/phoenix/index/PhoenixIndexBuilder.java  |   6 +-
 .../java/org/apache/phoenix/end2end/Bson4IT.java   | 200 ++++++++++++++++++++-
 .../apache/phoenix/end2end/OnDuplicateKey2IT.java  | 159 ++++++++++++++++
 9 files changed, 479 insertions(+), 24 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index aa506da9db..272bbec3a2 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -621,8 +621,9 @@ public class DeleteCompiler {
             final StatementContext context = dataPlan.getContext();
             Scan scan = context.getScan();
             scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_AGG, 
QueryConstants.TRUE);
-            if (context.getScanRanges().getPointLookupCount() == 1 &&
-                    returnResult == MutationState.ReturnResult.ROW) {
+            if (context.getScanRanges().getPointLookupCount() == 1
+                    && (returnResult == 
MutationState.ReturnResult.NEW_ROW_ON_SUCCESS
+                    || returnResult == 
MutationState.ReturnResult.OLD_ROW_ALWAYS)) {
                 
scan.setAttribute(BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE,
                         QueryConstants.TRUE);
             }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 26503508de..b92d699867 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -775,11 +775,16 @@ public class MutationState implements SQLCloseable {
                     }
                 }
                 if (this.returnResult != null) {
-                    if (this.returnResult == ReturnResult.ROW) {
+                    if (this.returnResult == ReturnResult.NEW_ROW_ON_SUCCESS) {
                         for (Mutation mutation : rowMutations) {
                             
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
                                     
PhoenixIndexBuilderHelper.RETURN_RESULT_ROW);
                         }
+                    } else if (this.returnResult == 
ReturnResult.OLD_ROW_ALWAYS) {
+                        for (Mutation mutation : rowMutations) {
+                            
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
+                                    
PhoenixIndexBuilderHelper.RETURN_RESULT_OLD_ROW);
+                        }
                     }
                 }
                 // The DeleteCompiler already generates the deletes for 
indexes, so no need to do it again
@@ -802,9 +807,12 @@ public class MutationState implements SQLCloseable {
                         
mutation.setAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB, 
onDupKeyBytes);
                     }
                     if (this.returnResult != null) {
-                        if (this.returnResult == ReturnResult.ROW) {
+                        if (this.returnResult == 
ReturnResult.NEW_ROW_ON_SUCCESS) {
                             
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
                                     
PhoenixIndexBuilderHelper.RETURN_RESULT_ROW);
+                        } else if (this.returnResult == 
ReturnResult.OLD_ROW_ALWAYS) {
+                            
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
+                                    
PhoenixIndexBuilderHelper.RETURN_RESULT_OLD_ROW);
                         }
                     }
                 }
@@ -1519,7 +1527,8 @@ public class MutationState implements SQLCloseable {
                                         .decodeInt(cell.getValueArray(), 
cell.getValueOffset(),
                                                 SortOrder.getDefault());
                                 if (this.returnResult != null) {
-                                    if (this.returnResult == ReturnResult.ROW) 
{
+                                    if (this.returnResult == 
ReturnResult.NEW_ROW_ON_SUCCESS ||
+                                            this.returnResult == 
ReturnResult.OLD_ROW_ALWAYS) {
                                         this.result = result;
                                     }
                                 }
@@ -2393,7 +2402,8 @@ public class MutationState implements SQLCloseable {
     }
 
     public enum ReturnResult {
-        ROW
+        NEW_ROW_ON_SUCCESS,
+        OLD_ROW_ALWAYS
     }
 
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
index 6ef1d9e38a..9b078abc63 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
@@ -41,6 +41,7 @@ public final class PhoenixIndexBuilderHelper {
 
     public static final String RETURN_RESULT = "_RETURN_RESULT";
     public static final byte[] RETURN_RESULT_ROW = new byte[]{0};
+    public static final byte[] RETURN_RESULT_OLD_ROW = new byte[]{1};
 
     public static byte[] serializeOnDupKeyIgnore() {
         return ON_DUP_KEY_IGNORE_BYTES;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index ecc961f0d2..4905a14153 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -36,7 +36,6 @@ import java.sql.RowId;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLXML;
-import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.ZoneOffset;
@@ -215,6 +214,28 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Phoeni
         }
     }
 
+    /**
+     * Executes the given SQL statement similar to JDBC API executeUpdate() 
but also returns the
+     * old row (before update) as Result object back to the client. This must 
be used with
+     * auto-commit Connection. This makes the operation atomic.
+     * Return the old row (state before update) regardless of whether the 
update is
+     * successful or not.
+     *
+     * @return The pair of int and ResultSet, where int represents value 1 for 
successful row update
+     * and 0 for non-successful row update, and ResultSet represents the old 
state of the row.
+     * @throws SQLException If the statement cannot be executed.
+     */
+    // Note: Do Not remove this, it is expected to be used by downstream 
applications
+    public Pair<Integer, ResultSet> executeAtomicUpdateReturnOldRow() throws 
SQLException {
+        if (!connection.getAutoCommit()) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
+                    .buildException();
+        }
+        preExecuteUpdate();
+        return executeMutation(statement, createAuditQueryLogger(statement, 
query),
+                MutationState.ReturnResult.OLD_ROW_ALWAYS);
+    }
+
     /**
      * Executes the given SQL statement similar to JDBC API executeUpdate() 
but also returns the
      * updated or non-updated row as Result object back to the client. This 
must be used with
@@ -226,6 +247,7 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Phoeni
      * and 0 for non-successful row update, and ResultSet represents the state 
of the row.
      * @throws SQLException If the statement cannot be executed.
      */
+    // Note: Do Not remove this, it is expected to be used by downstream 
applications
     public Pair<Integer, ResultSet> executeAtomicUpdateReturnRow() throws 
SQLException {
         if (!connection.getAutoCommit()) {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
@@ -233,7 +255,7 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Phoeni
         }
         preExecuteUpdate();
         return executeMutation(statement, createAuditQueryLogger(statement, 
query),
-                MutationState.ReturnResult.ROW);
+                MutationState.ReturnResult.NEW_ROW_ON_SUCCESS);
     }
 
     public QueryPlan optimizeQuery() throws SQLException {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 0dea1cef21..fa4cd22f5f 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -583,7 +583,7 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
     protected int executeMutation(final CompilableStatement stmt,
                                   final AuditQueryLogger queryLogger) throws 
SQLException {
         return executeMutation(stmt, true, queryLogger,
-                isResultSetExpected(stmt) ? ReturnResult.ROW : 
null).getFirst();
+                isResultSetExpected(stmt) ? ReturnResult.NEW_ROW_ON_SUCCESS : 
null).getFirst();
     }
 
     Pair<Integer, ResultSet> executeMutation(final CompilableStatement stmt,
@@ -626,7 +626,8 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                 isUpsert = stmt instanceof 
ExecutableUpsertStatement;
                                 isDelete = stmt instanceof 
ExecutableDeleteStatement;
                                 if (isDelete && connection.getAutoCommit() &&
-                                        returnResult == ReturnResult.ROW) {
+                                        (returnResult == 
ReturnResult.NEW_ROW_ON_SUCCESS ||
+                                                returnResult == 
ReturnResult.OLD_ROW_ALWAYS)) {
                                     // used only if single row deletion needs 
to atomically
                                     // return row that is deleted.
                                     plan = ((ExecutableDeleteStatement) 
stmt).compilePlan(
@@ -2489,6 +2490,7 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
      * update and 0 for non-successful row update, and ResultSet represents 
the state of the row.
      * @throws SQLException If the statement cannot be executed.
      */
+    // Note: Do Not remove this, it is expected to be used by downstream 
applications
     public Pair<Integer, ResultSet> executeAtomicUpdateReturnRow(String sql) 
throws SQLException {
         if (!connection.getAutoCommit()) {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
@@ -2496,7 +2498,36 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
         }
         CompilableStatement stmt = preExecuteUpdate(sql);
         Pair<Integer, ResultSet> result =
-                executeMutation(stmt, createAuditQueryLogger(stmt, sql), 
ReturnResult.ROW);
+                executeMutation(stmt, createAuditQueryLogger(stmt, sql),
+                        ReturnResult.NEW_ROW_ON_SUCCESS);
+        flushIfNecessary();
+        return result;
+    }
+
+    /**
+     * Executes the given SQL statement similar to JDBC API executeUpdate() 
but also returns the
+     * old row (before update) as Result object back to the client. This must 
be used with
+     * auto-commit Connection. This makes the operation atomic.
+     * Return the old row (state before update) regardless of whether the 
update is
+     * successful or not.
+     *
+     * @param sql The SQL DML statement, UPSERT or DELETE for Phoenix.
+     * @return The pair of int and ResultSet, where int represents value 1 for 
successful row
+     * update and 0 for non-successful row update, and ResultSet represents 
the old state of the
+     * row.
+     * @throws SQLException If the statement cannot be executed.
+     */
+    // Note: Do Not remove this, it is expected to be used by downstream 
applications
+    public Pair<Integer, ResultSet> executeAtomicUpdateReturnOldRow(String sql)
+            throws SQLException {
+        if (!connection.getAutoCommit()) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
+                    .buildException();
+        }
+        CompilableStatement stmt = preExecuteUpdate(sql);
+        Pair<Integer, ResultSet> result =
+                executeMutation(stmt, createAuditQueryLogger(stmt, sql),
+                        ReturnResult.OLD_ROW_ALWAYS);
         flushIfNecessary();
         return result;
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index fed0bb274d..ab1c7ad95e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -278,6 +278,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         // true, will be null if there is no expression on this column, 
otherwise false
         // This is only initialized for single row atomic mutation.
         private Map<ColumnReference, Pair<Cell, Boolean>> 
currColumnCellExprMap;
+        // store old row cells into a map for OLD_ROW return result. This 
preserves the original
+        // state of the row before any conditional updates are applied.
+        private Map<ColumnReference, Pair<Cell, Boolean>> 
oldRowColumnCellExprMap;
         // list containing the original mutations from the 
MiniBatchOperationInProgress. Contains
         // any annotations we were sent by the client, and can be used in 
hooks that don't get
         // passed MiniBatchOperationInProgress, like preWALAppend()
@@ -290,6 +293,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         private boolean hasLocalIndex;
         private boolean hasTransform;
         private boolean returnResult;
+        private boolean returnOldRow;
         private boolean hasConditionalTTL; // table has Conditional TTL
 
         public BatchMutateContext() {
@@ -1316,6 +1320,11 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             Mutation m = miniBatchOp.getOperation(i);
             if (this.builder.returnResult(m) && miniBatchOp.size() == 1) {
                 context.returnResult = true;
+                byte[] returnResult = 
m.getAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT);
+                if (returnResult != null && Arrays.equals(returnResult,
+                        PhoenixIndexBuilderHelper.RETURN_RESULT_OLD_ROW)) {
+                    context.returnOldRow = true;
+                }
             }
             if (this.builder.hasConditionalTTL(m)) {
                 context.hasConditionalTTL = true;
@@ -1656,7 +1665,13 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                       List<Cell> cells = new ArrayList<>();
                       cells.add(cell);
 
-                      addCellsIfResultReturned(miniBatchOp, context, cells);
+                      if (!context.returnOldRow) {
+                          addCellsIfResultReturned(miniBatchOp, 
context.returnResult, cells,
+                                  context.currColumnCellExprMap, false);
+                      } else {
+                          addCellsIfResultReturned(miniBatchOp, 
context.returnResult, cells,
+                                  context.oldRowColumnCellExprMap, true);
+                      }
 
                       Result result = Result.create(cells);
                       miniBatchOp.setOperationStatus(0,
@@ -1682,28 +1697,32 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   }
 
     /**
-     * If the result needs to be returned for the given update operation, 
identify the updated row
-     * cells and add the input list of cells.
+     * If the result needs to be returned for the given update operation, 
identify the appropriate
+     * row cells and add them to the input list of cells. The method can 
return either the updated
+     * row cells (for ROW return type) or the original row cells (for OLD_ROW 
return type).
      *
      * @param miniBatchOp Batch of mutations getting applied to region.
-     * @param context The BatchMutateContext object shared during coproc hooks 
execution as part of
-     * the batch mutate life cycle.
+     * @param returnResult Whether the result should be returned to the client.
      * @param cells The list of cells to be returned back to the client.
+     * @param currColumnCellExprMap The map containing column reference to 
cell mappings. This
+     * can be either the current/updated state (for ROW) or the original state 
(for OLD_ROW)
+     * depending on the return type requested.
      */
     private static void 
addCellsIfResultReturned(MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                                 BatchMutateContext context, 
List<Cell> cells) {
-        if (context.returnResult) {
-            Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap =
-                    context.currColumnCellExprMap;
+                                                 boolean returnResult, 
List<Cell> cells,
+                                                 Map<ColumnReference, 
Pair<Cell, Boolean>>
+                                                         currColumnCellExprMap,
+                                                 boolean retainOldRow) {
+        if (returnResult) {
             if (currColumnCellExprMap == null) {
                 return;
             }
             Mutation mutation = miniBatchOp.getOperation(0);
-            if (mutation instanceof Put) {
+            if (mutation instanceof Put && !retainOldRow) {
                 updateColumnCellExprMap(mutation, currColumnCellExprMap);
             }
             Mutation[] mutations = 
miniBatchOp.getOperationsFromCoprocessors(0);
-            if (mutations != null) {
+            if (mutations != null && !retainOldRow) {
                 for (Mutation m : mutations) {
                     updateColumnCellExprMap(m, currColumnCellExprMap);
                 }
@@ -1892,6 +1911,13 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
       Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() 
: null;
 
+        // Create separate map for old row data when OLD_ROW is requested
+        // This must be done before any conditional update logic to preserve 
original state
+        if (context.returnResult && context.returnOldRow && 
currentDataRowState != null) {
+            context.oldRowColumnCellExprMap = new HashMap<>();
+            updateCurrColumnCellExpr(currentDataRowState, 
context.oldRowColumnCellExprMap);
+        }
+
         // if result needs to be returned but the DML does not have ON 
DUPLICATE KEY present,
         // perform the mutation and return the result.
         if (opBytes == null) {
@@ -2074,6 +2100,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     private static void updateCurrColumnCellExpr(Put put,
                                                  Map<ColumnReference, 
Pair<Cell, Boolean>>
                                                          
currColumnCellExprMap) {
+        if (put == null) {
+            return;
+        }
         for (Map.Entry<byte[], List<Cell>> entry :
                 put.getFamilyCellMap().entrySet()) {
             for (Cell cell : entry.getValue()) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 473afbec2c..7d1219c57e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -22,6 +22,7 @@ import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -278,7 +279,10 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder 
{
 
     @Override
     public boolean returnResult(Mutation m) {
-        return m.getAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT) != null;
+        byte[] returnResult = 
m.getAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT);
+        return returnResult != null && (Arrays.equals(returnResult,
+                PhoenixIndexBuilderHelper.RETURN_RESULT_ROW) || 
Arrays.equals(returnResult,
+                PhoenixIndexBuilderHelper.RETURN_RESULT_OLD_ROW));
     }
 
     @Override
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
index 94f4fd1007..d6bfee7db2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
@@ -34,6 +34,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.schema.types.PDouble;
 import org.bson.BsonArray;
 import org.bson.BsonBinary;
@@ -51,7 +52,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.ExplainPlanAttributes;
-import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.util.PropertiesUtil;
 
@@ -59,6 +59,7 @@ import static 
org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -674,6 +675,185 @@ public class Bson4IT extends ParallelStatsDisabledIT {
     }
   }
 
+  @Test
+  public void testConditionalUpsertReturnOldRow() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    String tableName = generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      conn.setAutoCommit(true);
+      String ddl = "CREATE TABLE " + tableName
+              + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
+              + " CONSTRAINT pk PRIMARY KEY(PK1))";
+      conn.createStatement().execute(ddl);
+
+      String sample1 = getJsonString("json/sample_01.json");
+      String sample2 = getJsonString("json/sample_02.json");
+      String sample3 = getJsonString("json/sample_03.json");
+      BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+      BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+      BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
+
+      upsertRows(conn, tableName, bsonDocument1, bsonDocument2, bsonDocument3);
+      PreparedStatement stmt;
+
+      String conditionExpression =
+              "press = :press AND track[0].shot[2][0].city.standard[50] = 
:softly";
+
+      BsonDocument conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append(":press", new BsonString("beat"))
+              .append(":softly", new BsonString("softly")));
+
+      String query = "SELECT * FROM " + tableName +
+              " WHERE PK1 = 'pk0001' AND C1 = '0002' AND NOT 
BSON_CONDITION_EXPRESSION(COL, '"
+              + conditionDoc.toJson() + "')";
+      ResultSet rs = conn.createStatement().executeQuery(query);
+
+      assertTrue(rs.next());
+      assertEquals("pk0001", rs.getString(1));
+      assertEquals("0002", rs.getString(2));
+      BsonDocument document1 = (BsonDocument) rs.getObject(3);
+      assertEquals(bsonDocument1, document1);
+
+      assertFalse(rs.next());
+
+      conditionExpression =
+              "press = :press AND track[0].shot[2][0].city.standard[5] = 
:softly";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append(":press", new BsonString("beat"))
+              .append(":softly", new BsonString("softly")));
+
+      query = "SELECT * FROM " + tableName +
+              " WHERE PK1 = 'pk0001' AND C1 = '0002' AND 
BSON_CONDITION_EXPRESSION(COL, '"
+              + conditionDoc.toJson() + "')";
+      rs = conn.createStatement().executeQuery(query);
+
+      assertTrue(rs.next());
+      assertEquals("pk0001", rs.getString(1));
+      assertEquals("0002", rs.getString(2));
+      document1 = (BsonDocument) rs.getObject(3);
+      assertEquals(bsonDocument1, document1);
+
+      assertFalse(rs.next());
+
+      BsonDocument updateExp = new BsonDocument()
+              .append("$SET", new BsonDocument()
+                      .append("browserling",
+                              new 
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+                      .append("track[0].shot[2][0].city.standard[5]", new 
BsonString("soft"))
+                      .append("track[0].shot[2][0].city.problem[2]",
+                              new 
BsonString("track[0].shot[2][0].city.problem[2] + 529.435")))
+              .append("$UNSET", new BsonDocument()
+                      .append("track[0].shot[2][0].city.flame", new 
BsonNull()));
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END,"
+              + " C1 = ?");
+      stmt.setString(1, "pk0001");
+      stmt.setString(2, "0003");
+
+      assertReturnedOldRowResult(stmt, conn, tableName, "json/sample_01.json", 
true);
+
+      updateExp = new BsonDocument()
+              .append("$ADD", new BsonDocument()
+                      .append("new_samples",
+                              new BsonDocument().append("$set",
+                                      new BsonArray(Arrays.asList(
+                                              new 
BsonBinary(Bytes.toBytes("Sample10")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample12")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample13")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample14"))
+                                      )))))
+              .append("$DELETE_FROM_SET", new BsonDocument()
+                      .append("new_samples",
+                              new BsonDocument().append("$set",
+                                      new BsonArray(Arrays.asList(
+                                              new 
BsonBinary(Bytes.toBytes("Sample02")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample03"))
+                                      )))))
+              .append("$SET", new BsonDocument()
+                      .append("newrecord", ((BsonArray) 
(document1.get("track"))).get(0)))
+              .append("$UNSET", new BsonDocument()
+                      .append("rather[3].outline.halfway.so[2][2]", new 
BsonNull()));
+
+      conditionExpression =
+              "field_not_exists(newrecord) AND 
field_exists(rather[3].outline.halfway.so[2][2])";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument());
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END");
+
+      stmt.setString(1, "pk1010");
+
+      assertReturnedOldRowResult(stmt, conn, tableName, "json/sample_02.json", 
true);
+
+      updateExp = new BsonDocument()
+              .append("$SET", new BsonDocument()
+                      .append("result[1].location.state", new 
BsonString("AK")))
+              .append("$UNSET", new BsonDocument()
+                      .append("result[4].emails[1]", new BsonNull()));
+
+      conditionExpression =
+              "result[2].location.coordinates.latitude > :latitude OR "
+                      + "(field_exists(result[1].location) AND 
result[1].location.state != :state" +
+                      " AND field_exists(result[4].emails[1]))";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append(":latitude", new BsonDouble(0))
+              .append(":state", new BsonString("AK")));
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END");
+
+      stmt.setString(1, "pk1011");
+
+      assertReturnedOldRowResult(stmt, conn, tableName, "json/sample_03.json", 
true);
+
+      conditionExpression =
+              "press = :press AND track[0].shot[2][0].city.standard[5] = 
:softly";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append(":press", new BsonString("incorrect_value"))
+              .append(":softly", new BsonString("incorrect_value")));
+
+      updateExp = new BsonDocument()
+              .append("$SET", new BsonDocument()
+                      .append("new_field1",
+                              new 
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+                      .append("track[0].shot[2][0].city.standard[5]", new 
BsonString(
+                              "soft_new_val"))
+                      .append("track[0].shot[2][0].city.problem[2]",
+                              new 
BsonString("track[0].shot[2][0].city.problem[2] + 123")));
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END");
+      stmt.setString(1, "pk0001");
+
+      assertReturnedOldRowResult(stmt, conn, tableName, 
"json/sample_updated_01.json", false);
+
+      verifyRows(tableName, conn);
+    }
+  }
+
   @Test
   public void testBsonPk() throws Exception {
     Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -861,6 +1041,24 @@ public class Bson4IT extends ParallelStatsDisabledIT {
     assertEquals(RawBsonDocument.parse(getJsonString(jsonPath)), 
resultSet.getObject(3));
   }
 
+  private static void assertReturnedOldRowResult(PreparedStatement stmt,
+                                                 Connection conn,
+                                                 String tableName,
+                                                 String jsonPath,
+                                                 boolean success)
+          throws SQLException, IOException {
+    Pair<Integer, ResultSet> resultPair =
+            
stmt.unwrap(PhoenixPreparedStatement.class).executeAtomicUpdateReturnOldRow();
+    assertEquals(success ? 1 : 0, (int) resultPair.getFirst());
+    ResultSet resultSet = resultPair.getSecond();
+    if (success) {
+      assertEquals(RawBsonDocument.parse(getJsonString(jsonPath)), 
resultSet.getObject(3));
+      assertFalse(resultSet.next());
+    } else {
+      assertEquals(RawBsonDocument.parse(getJsonString(jsonPath)), 
resultSet.getObject(3));
+    }
+  }
+
   private static void validateExplainPlan(PreparedStatement ps, String 
tableName, String scanType)
       throws SQLException {
     ExplainPlan plan = 
ps.unwrap(PhoenixPreparedStatement.class).optimizeQuery().getExplainPlan();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
index d29fe6b4b9..de07c7a955 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.ExplainPlanAttributes;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
@@ -507,6 +508,164 @@ public class OnDuplicateKey2IT extends 
ParallelStatsDisabledIT {
         }
     }
 
+    /**
+     * Validates that the returned row contains the original state before the 
upsert operation.
+     * This method uses executeAtomicUpdateReturnOldRow() to test the OLD_ROW 
functionality.
+     */
+    private static void validateReturnedRowBeforeUpsert(Connection conn,
+                                                       String upsertSql,
+                                                       String tableName,
+                                                       Double col1,
+                                                       String col2,
+                                                       boolean success,
+                                                       BsonDocument inputDoc,
+                                                       BsonDocument 
expectedDoc,
+                                                       Integer col4)
+            throws SQLException {
+        int updateCount;
+        ResultSet resultSet;
+        if (inputDoc != null) {
+            PreparedStatement ps = conn.prepareStatement(upsertSql);
+            ps.setObject(1, inputDoc);
+            Pair<Integer, ResultSet> resultPair =
+                    
ps.unwrap(PhoenixPreparedStatement.class).executeAtomicUpdateReturnOldRow();
+            updateCount = resultPair.getFirst();
+            resultSet = resultPair.getSecond();
+        } else {
+            Statement stmt = conn.createStatement();
+            Pair<Integer, ResultSet> resultPair =
+                    
stmt.unwrap(PhoenixStatement.class).executeAtomicUpdateReturnOldRow(upsertSql);
+            updateCount = resultPair.getFirst();
+            resultSet = resultPair.getSecond();
+        }
+        boolean isOnDuplicateKey = upsertSql.toUpperCase().contains("ON 
DUPLICATE KEY");
+        if (conn.getAutoCommit() && isOnDuplicateKey) {
+            assertEquals(success ? 1 : 0, updateCount);
+            if (resultSet != null) {
+                assertEquals("pk000", resultSet.getString(1));
+                assertEquals(-123.98, resultSet.getDouble(2), 0.0);
+                assertEquals("pk003", resultSet.getString(3));
+                assertEquals(col1, resultSet.getDouble(4), 0.0);
+                validateReturnedRowResult(col2, expectedDoc, col4, resultSet);
+                assertFalse(resultSet.next());
+            }
+        }
+    }
+
+    @Test
+    public void testReturnOldRowResult1() throws Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+        // NOTE - Tuple result projection does not work well with local index 
because
+        // this assertion can be false: CellUtil.matchingRows(kvs[0], 
kvs[kvs.length-1])
+        // as the Tuple contains different rowkeys.
+        Assume.assumeTrue("ResultSet return does not work with local index",
+            !indexDDL.startsWith("create local index"));
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String sample1 = getJsonString("json/sample_01.json");
+        String sample2 = getJsonString("json/sample_02.json");
+        BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+        BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String tableName = generateUniqueName();
+            String ddl = "CREATE TABLE " + tableName
+                    + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR, 
COUNTER1 DOUBLE,"
+                    + " COUNTER2 VARCHAR,"
+                    + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY 
KEY(PK1, PK2, PK3))";
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            validateAtomicUpsertReturnOldRow(tableName, conn, bsonDocument1, 
bsonDocument2);
+
+            verifyIndexRow(conn, tableName, false);
+        }
+    }
+
+    private static void validateAtomicUpsertReturnOldRow(String tableName, 
Connection conn,
+                                                         BsonDocument 
bsonDocument1,
+                                                         BsonDocument 
bsonDocument2)
+            throws SQLException {
+        String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, 
COUNTER1, COL3, COL4)"
+                + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON 
DUPLICATE KEY " +
+                "IGNORE";
+        validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 0.0, null, 
true,
+                bsonDocument1, null, null);
+
+        upsertSql =
+                "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) "
+                        + "VALUES('pk000', -123.98, 'pk003', 0) ON DUPLICATE 
KEY IGNORE";
+        validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 1011.202, 
null, false,
+                null, bsonDocument1, 123);
+
+        upsertSql =
+                "UPSERT INTO " + tableName
+                        + " (PK1, PK2, PK3, COUNTER1, COUNTER2) 
VALUES('pk000', -123.98, "
+                        + "'pk003', 234, 'col2_000')";
+        validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 1011.202, 
null, true,
+                null, bsonDocument1, 123);
+
+        upsertSql = "UPSERT INTO " + tableName
+                + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON 
DUPLICATE KEY UPDATE "
+                + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 
1999.99 ELSE COUNTER1"
+                + " END, "
+                + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' 
ELSE COUNTER2 "
+                + "END, "
+                + "COL3 = ?, "
+                + "COL4 = 234";
+        validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 234d, 
"col2_000", true,
+                bsonDocument2, bsonDocument1, 123);
+
+        upsertSql = "UPSERT INTO " + tableName
+                + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON 
DUPLICATE KEY UPDATE "
+                + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 
1999.99 ELSE COUNTER1"
+                + " END,"
+                + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' 
ELSE COUNTER2 "
+                + "END";
+        validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 2233.99, 
"col2_001", false
+                , null, bsonDocument2, 234);
+    }
+
+    @Test
+    public void testReturnOldRowResult2() throws Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+        // NOTE - Tuple result projection does not work well with local index 
because
+        // this assertion can be false: CellUtil.matchingRows(kvs[0], 
kvs[kvs.length-1])
+        // as the Tuple contains different rowkeys.
+        Assume.assumeTrue("ResultSet return does not work with local index",
+            !indexDDL.startsWith("create local index"));
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String sample1 = getJsonString("json/sample_01.json");
+        String sample2 = getJsonString("json/sample_02.json");
+        BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+        BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String tableName = generateUniqueName();
+            String ddl = "CREATE TABLE " + tableName
+                    + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR, 
COUNTER1 DOUBLE,"
+                    + " COUNTER2 VARCHAR,"
+                    + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY 
KEY(PK1, PK2, PK3))";
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, 
COUNTER1, COL3, COL4)"
+                    + " VALUES('pk000', -123.98, 'pk003', 999.999, ?, 999) ON 
DUPLICATE KEY " +
+                    "IGNORE";
+            validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 0.0, 
null, true,
+                    bsonDocument1, null, null);
+
+            upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, 
COUNTER1, COL3, COL4)"
+                    + " VALUES('pk000', -123.98, 'pk003', 888.888, ?, 888) ON 
DUPLICATE KEY " +
+                    "IGNORE";
+            validateReturnedRowBeforeUpsert(conn, upsertSql, tableName, 
999.999, null, false,
+                    bsonDocument2, bsonDocument1, 999);
+        }
+    }
+
     @Test
     public void testReturnRowResultForMultiPointLookup() throws Exception {
         Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +


Reply via email to