This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new 2bf3e41e25 PHOENIX-6714 Return update status from Conditional Upserts 
(#1922)
2bf3e41e25 is described below

commit 2bf3e41e25327db19715972751bff190a6adb67c
Author: Jing Yu <y...@salesforce.com>
AuthorDate: Tue Jul 2 13:51:24 2024 -0700

    PHOENIX-6714 Return update status from Conditional Upserts (#1922)
---
 .../BaseScannerRegionObserverConstants.java        |   2 +
 .../org/apache/phoenix/execute/MutationState.java  |  34 +-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  11 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   | 189 ++++--
 .../apache/phoenix/end2end/OnDuplicateKey2IT.java  | 646 +++++++++++++++++++++
 .../apache/phoenix/end2end/OnDuplicateKeyIT.java   |  27 +-
 .../index/ReplicationWithWALAnnotationIT.java      |   2 +-
 7 files changed, 864 insertions(+), 47 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
index f0fac9c278..1f475088dc 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
@@ -62,6 +62,8 @@ public class BaseScannerRegionObserverConstants {
     public static final String UPSERT_SELECT_EXPRS = "_UpsertSelectExprs";
     public static final String DELETE_CQ = "_DeleteCQ";
     public static final String DELETE_CF = "_DeleteCF";
+    public static final String UPSERT_STATUS_CQ = "_UpsertStatusCQ";
+    public static final String UPSERT_CF = "_UpsertCF";
     public static final String EMPTY_CF = "_EmptyCF";
     public static final String EMPTY_COLUMN_QUALIFIER = 
"_EmptyColumnQualifier";
     public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 31e4d0bac0..d6c118c4bb 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.execute;
 
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.UPSERT_CF;
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.UPSERT_STATUS_CQ;
 import static 
org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
@@ -53,11 +55,13 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -102,9 +106,11 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableRef;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
@@ -157,6 +163,7 @@ public class MutationState implements SQLCloseable {
 
     private long sizeOffset;
     private int numRows = 0;
+    private int numUpdatedRowsForAutoCommit = 0;
     private long estimatedSize = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
@@ -458,6 +465,10 @@ public class MutationState implements SQLCloseable {
         return sizeOffset + numRows;
     }
 
+    public int getNumUpdatedRowsForAutoCommit() {
+        return numUpdatedRowsForAutoCommit;
+    }
+
     public int getNumRows() {
         return numRows;
     }
@@ -1342,6 +1353,7 @@ public class MutationState implements SQLCloseable {
                 Table hTable = 
connection.getQueryServices().getTable(htableName);
                 List<Mutation> currentMutationBatch = null;
                 boolean areAllBatchesSuccessful = false;
+                Object[] resultObjects = null;
 
                 try {
                     if (table.isTransactional()) {
@@ -1366,17 +1378,21 @@ public class MutationState implements SQLCloseable {
                     while (itrListMutation.hasNext()) {
                         final List<Mutation> mutationBatch = 
itrListMutation.next();
                         currentMutationBatch = mutationBatch;
+                        if (connection.getAutoCommit() && mutationBatch.size() 
== 1) {
+                            resultObjects = new Object[mutationBatch.size()];
+                        }
                         if (shouldRetryIndexedMutation) {
                             // if there was an index write failure, retry the 
mutation in a loop
                             final Table finalHTable = hTable;
                             final ImmutableBytesWritable finalindexMetaDataPtr 
=
                                     indexMetaDataPtr;
                             final PTable finalPTable = table;
+                            final Object[] finalResultObjects = resultObjects;
                             
PhoenixIndexFailurePolicyHelper.doBatchWithRetries(new MutateCommand() {
                                 @Override
                                 public void doMutation() throws IOException {
                                     try {
-                                        finalHTable.batch(mutationBatch, null);
+                                        finalHTable.batch(mutationBatch, 
finalResultObjects);
                                     } catch (InterruptedException e) {
                                         Thread.currentThread().interrupt();
                                         throw new IOException(e);
@@ -1415,8 +1431,22 @@ public class MutationState implements SQLCloseable {
                             }, iwe, connection, 
connection.getQueryServices().getProps());
                             shouldRetryIndexedMutation = false;
                         } else {
-                            hTable.batch(mutationBatch, null);
+                            hTable.batch(mutationBatch, resultObjects);
+                        }
+
+                        if (resultObjects != null) {
+                            Result result = (Result) resultObjects[0];
+                            if (result != null && !result.isEmpty()) {
+                                Cell cell = result.getColumnLatestCell(
+                                        Bytes.toBytes(UPSERT_CF), 
Bytes.toBytes(UPSERT_STATUS_CQ));
+                                numUpdatedRowsForAutoCommit = 
PInteger.INSTANCE.getCodec()
+                                        .decodeInt(cell.getValueArray(), 
cell.getValueOffset(),
+                                                SortOrder.getDefault());
+                            } else {
+                                numUpdatedRowsForAutoCommit = 1;
+                            }
                         }
+
                         // remove each batch from the list once it gets applied
                         // so when failures happens for any batch we only start
                         // from that batch only instead of doing duplicate 
reply of already
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 2b47e7d50b..6d9ab7fe13 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -568,13 +568,18 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                 checkIfDDLStatementandMutationState(stmt, 
state);
                                 MutationState lastState = plan.execute();
                                 state.join(lastState);
+                                // Unfortunately, JDBC uses an int for update 
count, so we
+                                // just max out at Integer.MAX_VALUE
+                                int lastUpdateCount = (int) 
Math.min(Integer.MAX_VALUE,
+                                        lastState.getUpdateCount());
                                 if (connection.getAutoCommit()) {
                                     connection.commit();
+                                    if (isAtomicUpsert) {
+                                        lastUpdateCount = 
connection.getMutationState()
+                                                
.getNumUpdatedRowsForAutoCommit();
+                                    }
                                 }
                                 setLastQueryPlan(null);
-                                // Unfortunately, JDBC uses an int for update 
count, so we
-                                // just max out at Integer.MAX_VALUE
-                                int lastUpdateCount = (int) 
Math.min(Integer.MAX_VALUE, lastState.getUpdateCount());
                                 setLastUpdateCount(lastUpdateCount);
                                 setLastUpdateOperation(stmt.getOperation());
                                 setLastUpdateTable(tableName == null ? 
TABLE_UNKNOWN : tableName);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index d34efb2673..aa0edd4720 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -18,6 +18,9 @@
 package org.apache.phoenix.hbase.index;
 
 
+import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SUCCESS;
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.UPSERT_CF;
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.UPSERT_STATUS_CQ;
 import static 
org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
 
 import java.io.ByteArrayInputStream;
@@ -40,7 +43,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
@@ -53,7 +58,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -120,8 +124,10 @@ import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerIndexUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -135,6 +141,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
 import static 
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
 import static 
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 /**
  * Do all the work of managing index updates from a single coprocessor. All 
Puts/Delets are passed
@@ -147,8 +154,8 @@ import static 
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRI
 public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IndexRegionObserver.class);
-    private static final OperationStatus IGNORE = new 
OperationStatus(OperationStatusCode.SUCCESS);
-    private static final OperationStatus NOWRITE = new 
OperationStatus(OperationStatusCode.SUCCESS);
+    private static final OperationStatus IGNORE = new OperationStatus(SUCCESS);
+    private static final OperationStatus NOWRITE = new 
OperationStatus(SUCCESS);
     public static final String PHOENIX_APPEND_METADATA_TO_WAL = 
"phoenix.append.metadata.to.wal";
     public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
     /**
@@ -513,7 +520,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           Mutation m = miniBatchOp.getOperation(i);
           if (this.builder.isAtomicOp(m)) {
               miniBatchOp.setOperationStatus(i, IGNORE);
-              continue;
           }
       }
   }
@@ -521,9 +527,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
           BatchMutateContext context) {
       for (int i = 0; i < miniBatchOp.size(); i++) {
-          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
-              continue;
-          }
           Mutation m = miniBatchOp.getOperation(i);
           if (this.builder.isAtomicOp(m) || this.builder.isEnabled(m)) {
               ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
@@ -573,8 +576,19 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
               if (!mutations.isEmpty()) {
                   addOnDupMutationsToBatch(miniBatchOp, i, mutations);
               } else {
-                  // empty list of generated mutations implies ON DUPLICATE 
KEY IGNORE
-                  miniBatchOp.setOperationStatus(i, IGNORE);
+                  // empty list of generated mutations implies
+                  // 1) ON DUPLICATE KEY IGNORE if row already exists, OR
+                  // 2) ON DUPLICATE KEY UPDATE if CASE expression is 
specified and in each of
+                  // them the new value is the same as the old value in the 
ELSE-clause (empty
+                  // cell timestamp will NOT be updated)
+                  byte[] retVal = PInteger.INSTANCE.toBytes(0);
+                  Cell cell = PhoenixKeyValueUtil.newKeyValue(m.getRow(), 
Bytes.toBytes(UPSERT_CF),
+                          Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0, 
retVal.length);
+                  // put Result in OperationStatus for returning update status 
from conditional
+                  // upserts, where 0 represents the row is not updated
+                  Result result = Result.create(new 
ArrayList<>(Arrays.asList(cell)));
+                  miniBatchOp.setOperationStatus(i,
+                          new OperationStatus(SUCCESS, result));
               }
           }
       }
@@ -602,7 +616,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             // unfortunately, we really should ask if the raw mutation (rather 
than the combined mutation)
             // should be indexed, which means we need to expose another method 
on the builder. Such is the
             // way optimization go though.
-            if (miniBatchOp.getOperationStatus(i) != IGNORE && 
this.builder.isEnabled(m)) {
+            if (!isAtomicOperationComplete(miniBatchOp.getOperationStatus(i)) 
&& this.builder.isEnabled(m)) {
                 ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
                 MultiMutation stored = context.multiMutationMap.get(row);
                 if (stored == null) {
@@ -625,7 +639,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     public static void setTimestamps(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
                                      IndexBuildManager builder, long ts) 
throws IOException {
         for (Integer i = 0; i < miniBatchOp.size(); i++) {
-            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+            if (isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))) {
                 continue;
             }
             Mutation m = miniBatchOp.getOperation(i);
@@ -725,7 +739,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     private void 
applyPendingPutMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
                                           BatchMutateContext context, long 
now) throws IOException {
         for (Integer i = 0; i < miniBatchOp.size(); i++) {
-            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+            if (isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))) {
                 continue;
             }
             Mutation m = miniBatchOp.getOperation(i);
@@ -824,7 +838,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             }
         }
         for (int i = 0; i < miniBatchOp.size(); i++) {
-            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+            if (isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))) {
                 continue;
             }
             Mutation m = miniBatchOp.getOperation(i);
@@ -1113,9 +1127,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     private void identifyMutationTypes(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
                                               BatchMutateContext context) {
         for (int i = 0; i < miniBatchOp.size(); i++) {
-            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
-                continue;
-            }
             Mutation m = miniBatchOp.getOperation(i);
             if (this.builder.isAtomicOp(m)) {
                 context.hasAtomic = true;
@@ -1286,8 +1297,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     private void 
releaseLocksForOnDupIgnoreMutations(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
                                                      BatchMutateContext 
context) {
         for (int i = 0; i < miniBatchOp.size(); i++) {
-            // status of all ON DUPLICATE KEY IGNORE mutations is updated to 
IGNORE
-            if (miniBatchOp.getOperationStatus(i) != IGNORE) {
+            if (!isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))) 
{
                 continue;
             }
             Mutation m = miniBatchOp.getOperation(i);
@@ -1372,6 +1382,17 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       try {
           if (success) {
               context.currentPhase = BatchMutatePhase.POST;
+              if(context.hasAtomic && miniBatchOp.size() == 1) {
+                  if 
(!isAtomicOperationComplete(miniBatchOp.getOperationStatus(0))) {
+                      byte[] retVal = PInteger.INSTANCE.toBytes(1);
+                      Cell cell = PhoenixKeyValueUtil.newKeyValue(
+                              miniBatchOp.getOperation(0).getRow(), 
Bytes.toBytes(UPSERT_CF),
+                              Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0, 
retVal.length);
+                      Result result = Result.create(new 
ArrayList<>(Arrays.asList(cell)));
+                      miniBatchOp.setOperationStatus(0,
+                              new OperationStatus(SUCCESS, result));
+                  }
+              }
           } else {
               context.currentPhase = BatchMutatePhase.FAILED;
           }
@@ -1500,9 +1521,12 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
      * to correctly support concurrent index mutations we need to always read 
the latest
      * data table row from memory.
      * It takes in an atomic Put mutation and generates a list of Put and 
Delete mutations.
-     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the 
row already exists.
-     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put 
mutation and optionally
-     * one Delete mutation (with DeleteColumn type cells for all columns set 
to null).
+     * The mutation list will be empty in two cases:
+     * 1) ON DUPLICATE KEY IGNORE and the row already exists;
+     * 2) ON DUPLICATE KEY UPDATE if CASE expression is specified and in each 
of them the new
+     * value is the same as the old value in the ELSE-clause.
+     * Otherwise, we will generate one Put mutation and optionally one Delete 
mutation (with
+     * DeleteColumn type cells for all columns set to null).
      */
   private List<Mutation> generateOnDupMutations(BatchMutateContext context, 
Put atomicPut) throws IOException {
       List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
@@ -1549,6 +1573,12 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       // read the column values requested in the get from the current data row
       List<Cell> cells = IndexUtil.readColumnsFromRow(currentDataRowState, 
colsReadInExpr);
 
+      // store current cells into a map where the key is ColumnReference of 
the column family and
+      // column qualifier, and value is a pair of cell and a boolean. The 
value of the boolean
+      // will be true if the expression is CaseExpression and Else-clause is 
evaluated to be
+      // true, will be null if there is no expression on this column, 
otherwise false
+      Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new 
HashMap<>();
+
       if (currentDataRowState == null) { // row doesn't exist
           if (skipFirstOp) {
               if (operations.size() <= 1 && repeat <= 1) {
@@ -1567,6 +1597,16 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       } else {
           // Base current state off of existing row
           flattenedCells = cells;
+          // store all current cells from currentDataRowState
+          for (Map.Entry<byte[], List<Cell>> entry :
+                  currentDataRowState.getFamilyCellMap().entrySet()) {
+              for (Cell cell : new ArrayList<>(entry.getValue())) {
+                  byte[] family = CellUtil.cloneFamily(cell);
+                  byte[] qualifier = CellUtil.cloneQualifier(cell);
+                  ColumnReference colRef = new ColumnReference(family, 
qualifier);
+                  currColumnCellExprMap.put(colRef, new Pair<>(cell, null));
+              }
+          }
       }
 
       MultiKeyValueTuple tuple = new MultiKeyValueTuple(flattenedCells);
@@ -1588,7 +1628,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
               int adjust = table.getBucketNum() == null ? 1 : 2;
               for (int i = 0; i < expressions.size(); i++) {
                   Expression expression = expressions.get(i);
-                  ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                  ptr.set(EMPTY_BYTE_ARRAY);
                   expression.evaluate(tuple, ptr);
                   PColumn column = table.getColumns().get(i + adjust);
                   Object value = expression.getDataType().toObject(ptr, 
column.getSortOrder());
@@ -1604,6 +1644,22 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                       column.getSortOrder(), table.rowKeyOrderOptimizable());
                   byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
                   row.setValue(column, bytes);
+
+                  // If the column exist in currColumnCellExprMap, set the 
boolean value in the
+                  // map to be true if the expression is CaseExpression and 
the Else-clause is
+                  // evaluated to be true
+                  ColumnReference colRef = new 
ColumnReference(column.getFamilyName().getBytes(),
+                          column.getColumnQualifierBytes());
+                  if (currColumnCellExprMap.containsKey(colRef)) {
+                      Pair<Cell, Boolean> valuePair = 
currColumnCellExprMap.get(colRef);
+                      if (expression instanceof CaseExpression
+                              && ((CaseExpression) 
expression).evaluateIndexOf(tuple, ptr)
+                              == expression.getChildren().size() - 1) {
+                          valuePair.setSecond(true);
+                      } else {
+                          valuePair.setSecond(false);
+                      }
+                  }
               }
               List<Cell> updatedCells = 
Lists.newArrayListWithExpectedSize(estimatedSize);
               List<Mutation> newMutations = row.toRowMutations();
@@ -1612,34 +1668,52 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
               }
               // update the cells to the latest values calculated above
               flattenedCells = mergeCells(flattenedCells, updatedCells);
+              // we need to retrieve empty cell later on which relies on 
binary search
+              flattenedCells.sort(CellComparator.getInstance());
               tuple.setKeyValues(flattenedCells);
           }
           // Repeat only applies to first statement
           repeat = 1;
       }
 
+      put = new Put(rowKey);
+      delete = new Delete(rowKey);
+      transferAttributes(atomicPut, put);
+      transferAttributes(atomicPut, delete);
       for (int i = 0; i < tuple.size(); i++) {
           Cell cell = tuple.getValue(i);
           if (cell.getType() == Cell.Type.Put) {
-              if (put == null) {
-                  put = new Put(rowKey);
-                  transferAttributes(atomicPut, put);
-                  mutations.add(put);
+              if (checkCellNeedUpdate(cell, currColumnCellExprMap)) {
+                  put.add(cell);
               }
-              put.add(cell);
           } else {
-              if (delete == null) {
-                  delete = new Delete(rowKey);
-                  transferAttributes(atomicPut, delete);
-                  mutations.add(delete);
-              }
               delete.add(cell);
           }
       }
+
+      if (!put.isEmpty() || !delete.isEmpty()) {
+          PTable table = operations.get(0).getFirst();
+          addEmptyKVCellToPut(put, tuple, table);
+      }
+
+      if (!put.isEmpty()) {
+          mutations.add(put);
+      }
+      if (!delete.isEmpty()) {
+          mutations.add(delete);
+      }
+
       return mutations;
   }
 
-
+    private void addEmptyKVCellToPut(Put put, MultiKeyValueTuple tuple, PTable 
table) throws IOException {
+        byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+        byte[] emptyCQ = 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+        Cell emptyKVCell = tuple.getValue(emptyCF, emptyCQ);
+        if (emptyKVCell != null) {
+            put.add(emptyKVCell);
+        }
+    }
 
     private static List<Cell> flattenCells(Mutation m) {
         List<Cell> flattenedCells = new ArrayList<>();
@@ -1653,6 +1727,44 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
     }
 
+    /**
+     * This function is to check if a cell need to be updated, based on the 
current cells' values.
+     * The cell will not be updated only if the column exist in the expression 
in which CASE is
+     * specified and the new value is the same as the old value in the 
ELSE-clause, otherwise it
+     * should be updated.
+     *
+     * @param cell the cell with new value to be checked
+     * @param colCellExprMap the column reference map with cell current value
+     * @return true if the cell need update, false otherwise
+     */
+    private boolean checkCellNeedUpdate(Cell cell,
+                                        Map<ColumnReference, Pair<Cell, 
Boolean>> colCellExprMap) {
+        byte[] family = CellUtil.cloneFamily(cell);
+        byte[] qualifier = CellUtil.cloneQualifier(cell);
+        ColumnReference colRef = new ColumnReference(family, qualifier);
+
+        // if cell not exist in the map, meaning that they are new and need 
update
+        if (colCellExprMap.isEmpty() || !colCellExprMap.containsKey(colRef)) {
+            return true;
+        }
+
+        Pair<Cell, Boolean> valuePair = colCellExprMap.get(colRef);
+        Boolean isInCaseExpressionElseClause = valuePair.getSecond();
+        if (isInCaseExpressionElseClause == null) {
+            return false;
+        }
+        if (!isInCaseExpressionElseClause) {
+            return true;
+        }
+        Cell oldCell = valuePair.getFirst();
+        ImmutableBytesPtr newValuePtr = new 
ImmutableBytesPtr(cell.getValueArray(),
+                cell.getValueOffset(), cell.getValueLength());
+        ImmutableBytesPtr oldValuePtr = new 
ImmutableBytesPtr(oldCell.getValueArray(),
+                oldCell.getValueOffset(), oldCell.getValueLength());
+        return !Bytes.equals(oldValuePtr.get(), oldValuePtr.getOffset(), 
oldValuePtr.getLength(),
+                newValuePtr.get(), newValuePtr.getOffset(), 
newValuePtr.getLength());
+    }
+
     /**
      * ensure that the generated mutations have all the attributes like schema
      */
@@ -1700,4 +1812,15 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     public static Map<String, byte[]> getAttributeValuesFromWALKey(WALKey key) 
{
         return new HashMap<String, byte[]>(key.getExtendedAttributes());
     }
+
+    /**
+     * Determines whether the atomic operation is complete based on the 
operation status.
+     * HBase returns null Result by default for successful Put and Delete 
mutations, only for
+     * Increment and Append mutations, non-null Result is returned by default.
+     * @param status the operation status.
+     * @return true if the atomic operation is completed, false otherwise.
+     */
+    public static boolean isAtomicOperationComplete(OperationStatus status) {
+        return status.getOperationStatusCode() == SUCCESS && 
status.getResult() != null;
+    }
 }
\ No newline at end of file
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
new file mode 100644
index 0000000000..f7d72b868e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class OnDuplicateKey2IT extends ParallelStatsDisabledIT {
+    private final String indexDDL;
+    private final String tableDDLOptions;
+
+    private static final String[] INDEX_DDLS =
+            new String[] {
+                    "",
+                    "create local index %s_IDX on %s(counter1) include 
(counter2)",
+                    "create local index %s_IDX on %s(counter1, counter2)",
+                    "create index %s_IDX on %s(counter1) include (counter2)",
+                    "create index %s_IDX on %s(counter1, counter2)",
+                    "create uncovered index %s_IDX on %s(counter1)",
+                    "create uncovered index %s_IDX on %s(counter1, counter2)"};
+
+    public OnDuplicateKey2IT(String indexDDL, boolean columnEncoded) {
+        this.indexDDL = indexDDL;
+        this.tableDDLOptions = columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0";
+    }
+
+    @Parameters(name="OnDuplicateKey2IT_{index},columnEncoded={1}")
+    public static synchronized Collection<Object> data() {
+        List<Object> testCases = Lists.newArrayList();
+        for (String indexDDL : INDEX_DDLS) {
+            for (boolean columnEncoded : new boolean[]{ false, true }) {
+                testCases.add(new Object[] { indexDDL, columnEncoded });
+            }
+        }
+        return testCases;
+    }
+
+    private void createIndex(Connection conn, String tableName) throws 
SQLException {
+        if (indexDDL == null || indexDDL.length() == 0) {
+            return;
+        }
+        String ddl = String.format(indexDDL, tableName, tableName);
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testIgnoreReturnValue() throws Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, 
counter1 bigint, counter2 bigint)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        conn.createStatement().execute("UPSERT INTO " + tableName + " 
VALUES('a',10)");
+
+        int actualReturnValue = conn.createStatement().executeUpdate(
+                "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY 
IGNORE");
+        assertEquals(0, actualReturnValue);
+
+        conn.close();
+    }
+
+    @Test
+    public void testColumnsTimestampUpdateWithAllCombinations() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+
+            String ddl = "create table " + tableName + "(pk varchar primary 
key, " +
+                    "counter1 integer, counter2 integer, counter3 smallint, 
counter4 bigint, " +
+                    "counter5 varchar)" + tableDDLOptions;
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+            String dml = String.format("UPSERT INTO %s VALUES('abc', 0, 10, 
100, 1000, 'NONE')",
+                    tableName);
+            int actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            String dql = "SELECT * from " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+
+            List<Long> oldTimestamps = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+
+            dml = "UPSERT INTO " + tableName + " VALUES ('abc', 0, 10) ON 
DUPLICATE KEY UPDATE " +
+                    // conditional update with different value
+                    "counter1 = CASE WHEN counter1 < 1 THEN counter1 + 1 ELSE 
counter1 END, " +
+                    // conditional update with same value in ELSE clause (will 
not update timestamp)
+                    "counter2 = CASE WHEN counter2 < 10 THEN counter2 + 1 ELSE 
counter2 END, " +
+                    // intentional update with different value
+                    "counter3 = counter3 + 100, " +
+                    // intentional update with same value
+                    "counter4 = counter4";
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(10, rs.getInt("counter2"));
+            assertEquals(200, rs.getInt("counter3"));
+            assertEquals(1000, rs.getInt("counter4"));
+            assertEquals("NONE", rs.getString("counter5"));
+            assertFalse(rs.next());
+
+            List<Long> newTimestamps = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+
+            assertEquals(6, oldTimestamps.size());
+            assertEquals(6, newTimestamps.size());
+            assertEquals(oldTimestamps.get(2), newTimestamps.get(2)); // 
counter2 NOT updated
+            assertEquals(oldTimestamps.get(5), newTimestamps.get(5)); // 
counter5 NOT updated
+            assertTrue(oldTimestamps.get(0) < newTimestamps.get(0)
+                    && oldTimestamps.get(1) < newTimestamps.get(1)
+                    && oldTimestamps.get(3) < newTimestamps.get(3)
+                    && oldTimestamps.get(4) < newTimestamps.get(4)); // other 
columns updated
+        }
+    }
+
+    @Test
+    public void testColumnsTimestampUpdateWithOneConditionalUpdate() throws 
Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+
+            String ddl = "create table " + tableName +
+                    "(pk varchar primary key, counter1 bigint, counter2 
bigint)" + tableDDLOptions;
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            String dml;
+            dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100)", 
tableName);
+            int actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            String dql = "SELECT * from " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+
+            List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+
+            // Case 1: timestamps update with different value in 
WHEN-THEN-clause
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                            "ON DUPLICATE KEY UPDATE " +
+                            "counter1 = CASE WHEN counter1 < 1 THEN counter1 + 
1 ELSE counter1 END",
+                    tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertTrue(timestampList1.get(0) > timestampList0.get(0)
+                    && timestampList1.get(1) > timestampList0.get(1));
+
+            // Case 2: timestamps NOT update with same value in ELSE-clause
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(0, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertEquals(timestampList1.get(0), timestampList2.get(0)); // 
empty column NOT updated
+            assertEquals(timestampList1.get(1), timestampList2.get(1)); // 
counter1 NOT updated
+
+            // Case 3: timestamps update with different value in ELSE-clause
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                            "ON DUPLICATE KEY UPDATE " +
+                            "counter1 = CASE WHEN counter1 < 1 THEN counter1 
ELSE counter1 + 1 END",
+                    tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(2, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertTrue(timestampList3.get(0) > timestampList2.get(0)
+                    && timestampList3.get(1) > timestampList2.get(1));
+
+            // Case 4: timestamps update with same value in WHEN-THEN-clause
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList4 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertTrue(timestampList4.get(0) > timestampList3.get(0)
+                    && timestampList4.get(1) > timestampList3.get(1));
+        }
+    }
+
+    @Test
+    public void testColumnsTimestampUpdateWithOneConditionalValuesUpdate() 
throws Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+
+            String ddl = "create table " + tableName +
+                    "(pk varchar primary key, counter1 integer, counter2 
integer)" + tableDDLOptions;
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            String dml = String.format("UPSERT INTO %s VALUES('abc', 1, 100)", 
tableName);
+            int actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+
+            // Case 1: timestamps update with same value in WHEN-THEN-clause
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                    "ON DUPLICATE KEY UPDATE " +
+                    "counter1 = CASE WHEN counter2 <= 100 THEN 1 ELSE 0 END", 
tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            String dql = "SELECT * from " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+
+            assertTrue(timestampList0.get(0) < timestampList1.get(0)
+                    && timestampList0.get(1) < timestampList1.get(1)); // 
counter1 updated
+            assertEquals(timestampList0.get(2), timestampList1.get(2)); // 
counter2 NOT updated
+
+            // Case 2: timestamps NOT update with same value in ELSE-clause
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                    "ON DUPLICATE KEY UPDATE " +
+                    "counter1 = CASE WHEN counter2 > 100 THEN 0 ELSE 1 END", 
tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(0, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+
+            assertEquals(timestampList1.get(0), timestampList2.get(0));
+            assertEquals(timestampList1.get(1), timestampList2.get(1));
+            assertEquals(timestampList1.get(2), timestampList2.get(2));
+        }
+    }
+
+    @Test
+    public void testColumnsTimestampUpdateWithMultipleConditionalUpdate() 
throws Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String ddl = "create table " + tableName +
+                    "(pk varchar primary key, counter1 integer, counter2 
integer, approval " +
+                    "varchar)" + tableDDLOptions;
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            String dml;
+            dml = String.format("UPSERT INTO %s VALUES('abc', 0, 9, 'NONE')", 
tableName);
+            int actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+
+            // Case 1: all columns timestamps updated
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                    "ON DUPLICATE KEY UPDATE " +
+                    "counter1 = CASE WHEN counter1 < 1 THEN 1 ELSE counter1 
END," +
+                    "counter2 = CASE WHEN counter2 < 11 THEN counter2 + 1 ELSE 
counter2 END," +
+                    "approval = CASE WHEN counter2 < 10 THEN 'NONE' " +
+                    "WHEN counter2 < 11 THEN 'MANAGER_APPROVAL' " +
+                    "ELSE approval END", tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            String dql = "SELECT * from " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(10, rs.getInt("counter2"));
+            assertEquals("NONE", rs.getString("approval"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertTrue(timestampList1.get(0) > timestampList0.get(0)
+                    && timestampList1.get(1) > timestampList0.get(1)
+                    && timestampList1.get(2) > timestampList0.get(2)
+                    && timestampList1.get(3) > timestampList0.get(3));
+
+            // Case 2: timestamps of counter2 and approval updated
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(11, rs.getInt("counter2"));
+            assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertEquals(timestampList1.get(1), timestampList2.get(1)); // 
counter1 NOT updated
+            assertTrue(timestampList2.get(0) > timestampList1.get(0)
+                    && timestampList2.get(2) > timestampList1.get(2)
+                    && timestampList2.get(3) > timestampList1.get(3));
+
+            // Case 3: all timestamps NOT updated, including empty column
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(0, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(11, rs.getInt("counter2"));
+            assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertEquals(timestampList2.get(0), timestampList3.get(0));
+            assertEquals(timestampList2.get(1), timestampList3.get(1));
+            assertEquals(timestampList2.get(2), timestampList3.get(2));
+            assertEquals(timestampList2.get(3), timestampList3.get(3));
+        }
+    }
+
+    @Test
+    public void testColumnsTimestampUpdateWithIntentionalUpdate() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+
+            String ddl = "create table " + tableName +
+                    "(pk varchar primary key, counter1 bigint, counter2 
bigint)" + tableDDLOptions;
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            String dml;
+            dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100)", 
tableName);
+            int actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+
+            // Case 1: different value of one column
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                            "ON DUPLICATE KEY UPDATE counter1 = counter1 + 1",
+                    tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            String dql = "SELECT * from " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertEquals(timestampList0.get(2), timestampList1.get(2)); // 
counter2 NOT updated
+            assertTrue(timestampList1.get(0) > timestampList0.get(0)
+                    && timestampList1.get(1) > timestampList0.get(1)); // 
updated columns
+
+            // Case 2: same value of one column will also be updated
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                            "ON DUPLICATE KEY UPDATE counter1 = counter1",
+                    tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertEquals(timestampList2.get(2), timestampList1.get(2)); // 
counter2 NOT updated
+            assertTrue(timestampList2.get(0) > timestampList1.get(0)
+                    && timestampList2.get(1) > timestampList1.get(1));
+
+            // Case 3: same value of one column, different of the other
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                            "ON DUPLICATE KEY UPDATE counter1 = counter1, 
counter2 = counter2 + 1",
+                    tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(101, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertTrue(timestampList3.get(0) > timestampList2.get(0)
+                    && timestampList3.get(1) > timestampList2.get(1)
+                    && timestampList3.get(2) > timestampList2.get(2)); // 
counter2
+
+            // Case 4: same values of all columns will also be updated
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES 
('abc', 0, 10) " +
+                            "ON DUPLICATE KEY UPDATE counter1 = counter1, 
counter2 = counter2",
+                    tableName);
+            actualReturnValue = conn.createStatement().executeUpdate(dml);
+            assertEquals(1, actualReturnValue);
+
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(101, rs.getInt("counter2"));
+            assertFalse(rs.next());
+
+            List<Long> timestampList4 = getAllColumnsLatestCellTimestamp(conn, 
tableName);
+            assertTrue(timestampList4.get(0) > timestampList3.get(0)
+                    && timestampList4.get(1) > timestampList3.get(1)
+                    && timestampList4.get(2) > timestampList3.get(2));
+        }
+    }
+
+    @Test
+    public void testBatchedUpsertOnDupKeyAutoCommit() throws Exception {
+        testBatchedUpsertOnDupKey(true);
+    }
+
+    @Test
+    public void testBatchedUpsertOnDupKeyNoAutoCommit() throws Exception {
+        testBatchedUpsertOnDupKey(false);
+    }
+
+    private void testBatchedUpsertOnDupKey(boolean autocommit) throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(autocommit);
+
+            Statement stmt = conn.createStatement();
+
+            stmt.execute("create table " + tableName + "(pk varchar primary 
key, " +
+                    "counter1 integer, counter2 integer, approval varchar)");
+            createIndex(conn, tableName);
+
+            stmt.execute("UPSERT INTO " + tableName + " VALUES('a', 0, 10, 
'NONE')");
+            conn.commit();
+
+            stmt.addBatch("UPSERT INTO " + tableName +
+                    " (pk, counter1, counter2) VALUES ('a', 0, 10) ON 
DUPLICATE KEY IGNORE");
+            stmt.addBatch("UPSERT INTO " + tableName +
+                    " (pk, counter1, counter2) VALUES ('a', 0, 10) ON 
DUPLICATE KEY UPDATE" +
+                    " counter1 = CASE WHEN counter1 < 1 THEN 1 ELSE counter1 
END");
+
+            stmt.addBatch("UPSERT INTO " + tableName +
+                    " (pk, counter1, counter2) VALUES ('b', 0, 9) ON DUPLICATE 
KEY IGNORE");
+            String dml =  "UPSERT INTO " + tableName +
+                    " (pk, counter1, counter2) VALUES ('b', 0, 10) ON 
DUPLICATE KEY UPDATE" +
+                    " counter2 = CASE WHEN counter2 < 11 THEN counter2 + 1 
ELSE counter2 END," +
+                    " approval = CASE WHEN counter2 < 10 THEN 'NONE'" +
+                    " WHEN counter2 < 11 THEN 'MANAGER_APPROVAL'" +
+                    " ELSE approval END";
+            stmt.addBatch(dml);
+            stmt.addBatch(dml);
+            stmt.addBatch(dml);
+
+            int[] actualReturnValues = stmt.executeBatch();
+            int[] expectedReturnValues = new int[]{1, 1, 1, 1, 1, 1};
+            if (!autocommit) {
+                conn.commit();
+            }
+            assertArrayEquals(expectedReturnValues, actualReturnValues);
+
+            ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString("pk"));
+            assertEquals(1, rs.getInt("counter1"));
+            assertEquals(10, rs.getInt("counter2"));
+            assertEquals("NONE", rs.getString("approval"));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString("pk"));
+            assertEquals(0, rs.getInt("counter1"));
+            assertEquals(11, rs.getInt("counter2"));
+            assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+            assertFalse(rs.next());
+        }
+    }
+
+    private long getEmptyKVLatestCellTimestamp(String tableName) throws 
Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PTable pTable = PhoenixRuntime.getTable(conn, tableName);
+        byte[] emptyCQ = 
EncodedColumnsUtil.getEmptyKeyValueInfo(pTable).getFirst();
+        return getColumnLatestCellTimestamp(tableName, emptyCQ);
+    }
+
+    private long getColumnLatestCellTimestamp(String tableName, byte[] cq) 
throws Exception {
+        Scan scan = new Scan();
+        try (org.apache.hadoop.hbase.client.Connection hconn =
+                     ConnectionFactory.createConnection(config)) {
+            Table table = hconn.getTable(TableName.valueOf(tableName));
+            ResultScanner resultScanner = table.getScanner(scan);
+            Result result = resultScanner.next();
+            return result.getColumnLatestCell(
+                    QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
cq).getTimestamp();
+        }
+    }
+
+    private List<Long> getAllColumnsLatestCellTimestamp(Connection conn, 
String tableName) throws Exception {
+        List<Long> timestampList = new ArrayList<>();
+        PTable pTable = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null, tableName));
+        List<PColumn> columns = pTable.getColumns();
+
+        // timestamp of the empty cell
+        timestampList.add(getEmptyKVLatestCellTimestamp(tableName));
+        // timestamps of all other columns
+        for (int i = 1; i < columns.size(); i++) {
+            byte[] cq = columns.get(i).getColumnQualifierBytes();
+            timestampList.add(getColumnLatestCellTimestamp(tableName, cq));
+        }
+        return timestampList;
+    }
+
+    private boolean isSetCorrectResultEnabledOnHBase() {
+        // true for HBase 2.4.18+, 2.5.9+, and 2.6.0+ versions, false otherwise
+        String hbaseVersion = VersionInfo.getVersion();
+        String[] versionArr = hbaseVersion.split("\\.");
+        int majorVersion = Integer.parseInt(versionArr[0]);
+        int minorVersion = Integer.parseInt(versionArr[1]);
+        int patchVersion = Integer.parseInt(versionArr[2].split("-")[0]);
+        if (majorVersion > 2) {
+            return true;
+        }
+        if (majorVersion < 2) {
+            return false;
+        }
+        if (minorVersion >= 6) {
+            return true;
+        }
+        if (minorVersion < 4) {
+            return false;
+        }
+        if (minorVersion == 4) {
+            return patchVersion >= 18;
+        }
+        return patchVersion >= 9;
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
index badc2c5e07..59c664edc6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -44,8 +44,10 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.junit.Test;
@@ -60,11 +62,11 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 @RunWith(Parameterized.class)
 public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
     private final String indexDDL;
-    
+
     public OnDuplicateKeyIT(String indexDDL) {
         this.indexDDL = indexDDL;
     }
-    
+
     @Parameters
     public static synchronized Collection<Object> data() {
         List<Object> testCases = Lists.newArrayList();
@@ -861,17 +863,26 @@ public class OnDuplicateKeyIT extends 
ParallelStatsDisabledIT {
     }
 
     private void assertHBaseRowTimestamp(String tableName, long 
expectedTimestamp) throws Exception {
+        long actualTimestamp = getEmptyKVLatestCellTimestamp(tableName);
+        assertEquals(expectedTimestamp, actualTimestamp);
+    }
+
+    private long getEmptyKVLatestCellTimestamp(String tableName) throws 
Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PTable pTable = PhoenixRuntime.getTable(conn, tableName);
+        byte[] emptyCQ = 
EncodedColumnsUtil.getEmptyKeyValueInfo(pTable).getFirst();
+        return getColumnLatestCellTimestamp(tableName, emptyCQ);
+    }
+
+    private long getColumnLatestCellTimestamp(String tableName, byte[] cq) 
throws Exception {
         Scan scan = new Scan();
-        byte[] emptyKVQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst();
         try (org.apache.hadoop.hbase.client.Connection hconn =
-                 ConnectionFactory.createConnection(config)) {
+                     ConnectionFactory.createConnection(config)) {
             Table table = hconn.getTable(TableName.valueOf(tableName));
             ResultScanner resultScanner = table.getScanner(scan);
             Result result = resultScanner.next();
-            long actualTimestamp = result.getColumnLatestCell(
-                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
emptyKVQualifier).getTimestamp();
-            assertEquals(expectedTimestamp, actualTimestamp);
+            return result.getColumnLatestCell(
+                    QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
cq).getTimestamp();
         }
     }
 }
-    
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
index ee4e387d4b..cd016c2303 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
@@ -176,7 +176,7 @@ public class ReplicationWithWALAnnotationIT extends 
BaseTest {
         String[] versionArr = hbaseVersion.split("\\.");
         int majorVersion = Integer.parseInt(versionArr[0]);
         int minorVersion = Integer.parseInt(versionArr[1]);
-        int patchVersion = Integer.parseInt(versionArr[2].split("-hadoop")[0]);
+        int patchVersion = Integer.parseInt(versionArr[2].split("-")[0]);
         if (majorVersion > 2) {
             return true;
         }

Reply via email to