http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index d121d2d..d6adc71 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -17,20 +17,77 @@
  */
 package org.apache.phoenix.index;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import 
org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import com.google.common.collect.Lists;
 
 /**
  * Index builder for covered-columns index that ties into phoenix for faster 
use.
  */
 public class PhoenixIndexBuilder extends NonTxIndexBuilder {
+    public static final String ATOMIC_OP_ATTRIB = "_ATOMIC_OP_ATTRIB";
+    private static final byte[] ON_DUP_KEY_IGNORE_BYTES = new byte[] {1}; // 
boolean true
+    private static final int ON_DUP_KEY_HEADER_BYTE_SIZE = Bytes.SIZEOF_SHORT 
+ Bytes.SIZEOF_BOOLEAN;
+    
 
+    private static List<Cell> flattenCells(Mutation m, int estimatedSize) 
throws IOException {
+        List<Cell> flattenedCells = 
Lists.newArrayListWithExpectedSize(estimatedSize);
+        flattenCells(m, flattenedCells);
+        return flattenedCells;
+    }
+    
+    private static void flattenCells(Mutation m, List<Cell> flattenedCells) 
throws IOException {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            flattenedCells.addAll(cells);
+        }
+    }
+    
     @Override
     public IndexMetaData 
getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
         return new PhoenixIndexMetaData(env, 
miniBatchOp.getOperation(0).getAttributesMap());
@@ -53,4 +110,266 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder 
{
     @Override
     public void batchStarted(MiniBatchOperationInProgress<Mutation> 
miniBatchOp, IndexMetaData context) throws IOException {
     }
+    
+    @Override
+    public boolean isAtomicOp(Mutation m) throws IOException {
+        return m.getAttribute(ATOMIC_OP_ATTRIB) != null;
+    }
+
+    private static void transferCells(Mutation source, Mutation target) {
+        target.getFamilyCellMap().putAll(source.getFamilyCellMap());
+    }
+    private static void transferAttributes(Mutation source, Mutation target) {
+        for (Map.Entry<String, byte[]> entry : 
source.getAttributesMap().entrySet()) {
+            target.setAttribute(entry.getKey(), entry.getValue());
+        }
+    }
+    private static List<Mutation> 
convertIncrementToPutInSingletonList(Increment inc) {
+        byte[] rowKey = inc.getRow();
+        Put put = new Put(rowKey);
+        transferCells(inc, put);
+        transferAttributes(inc, put);
+        return Collections.<Mutation>singletonList(put);
+    }
+    
+    @Override
+    public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
+        byte[] opBytes = inc.getAttribute(ATOMIC_OP_ATTRIB);
+        if (opBytes == null) { // Unexpected
+            return null;
+        }
+        inc.setAttribute(ATOMIC_OP_ATTRIB, null);
+        Put put = null;
+        Delete delete = null;
+        // We cannot neither use the time stamp in the Increment to set the 
Get time range
+        // nor set the Put/Delete time stamp and have this be atomic as HBase 
does not
+        // handle that. Though we disallow using ON DUPLICATE KEY clause when 
the
+        // CURRENT_SCN is set, we still may have a time stamp set as of when 
the table
+        // was resolved on the client side. We need to ignore this as well due 
to limitations
+        // in HBase, but this isn't too bad as the time will be very close the 
the current
+        // time anyway.
+        long ts = HConstants.LATEST_TIMESTAMP;
+        byte[] rowKey = inc.getRow();
+        final Get get = new Get(rowKey);
+        if (isDupKeyIgnore(opBytes)) {
+            get.setFilter(new FirstKeyOnlyFilter());
+            Result result = this.env.getRegion().get(get);
+            return result.isEmpty() ? 
convertIncrementToPutInSingletonList(inc) : Collections.<Mutation>emptyList();
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
+        DataInputStream input = new DataInputStream(stream);
+        boolean skipFirstOp = input.readBoolean();
+        short repeat = input.readShort();
+        final int[] estimatedSizeHolder = {0};
+        List<Pair<PTable, List<Expression>>> operations = 
Lists.newArrayListWithExpectedSize(3);
+        while (true) {
+            ExpressionVisitor<Void> visitor = new 
StatelessTraverseAllExpressionVisitor<Void>() {
+                @Override
+                public Void visit(KeyValueColumnExpression expression) {
+                    get.addColumn(expression.getColumnFamily(), 
expression.getColumnName());
+                    estimatedSizeHolder[0]++;
+                    return null;
+                }
+            };
+            try {
+                int nExpressions = WritableUtils.readVInt(input);
+                List<Expression>expressions = 
Lists.newArrayListWithExpectedSize(nExpressions);
+                for (int i = 0; i < nExpressions; i++) {
+                    Expression expression = 
ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                    expression.readFields(input);
+                    expressions.add(expression);
+                    expression.accept(visitor);                    
+                }
+                PTableProtos.PTable tableProto = 
PTableProtos.PTable.parseDelimitedFrom(input);
+                PTable table = PTableImpl.createFromProto(tableProto);
+                operations.add(new Pair<>(table, expressions));
+            } catch (EOFException e) {
+                break;
+            }
+        }
+        int estimatedSize = estimatedSizeHolder[0];
+        if (get.getFamilyMap().isEmpty()) {
+            get.setFilter(new FirstKeyOnlyFilter());
+        }
+        MultiKeyValueTuple tuple;
+        List<Cell>cells = ((HRegion)this.env.getRegion()).get(get, false);
+        if (cells.isEmpty()) {
+            if (skipFirstOp) {
+                if (operations.size() <= 1 && repeat <= 1) {
+                    return convertIncrementToPutInSingletonList(inc);
+                }
+                repeat--; // Skip first operation (if first wasn't ON 
DUPLICATE KEY IGNORE)
+            }
+            // Base current state off of new row
+            tuple = new MultiKeyValueTuple(flattenCells(inc, estimatedSize));
+        } else {
+            // Base current state off of existing row
+            tuple = new MultiKeyValueTuple(cells);
+        }
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        for (int opIndex = 0; opIndex < operations.size(); opIndex++) {
+            Pair<PTable, List<Expression>> operation = operations.get(opIndex);
+            PTable table = operation.getFirst();
+            List<Expression> expressions = operation.getSecond();
+            for (int j = 0; j < repeat; j++) { // repeater loop
+                ptr.set(rowKey);
+                PRow row = table.newRow(GenericKeyValueBuilder.INSTANCE, ts, 
ptr, false);
+                for (int i = 0; i < expressions.size(); i++) {
+                    Expression expression = expressions.get(i);
+                    ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                    expression.evaluate(tuple, ptr);
+                    PColumn column = table.getColumns().get(i + 1);
+                    Object value = expression.getDataType().toObject(ptr, 
column.getSortOrder());
+                    // We are guaranteed that the two column will have the
+                    // same type.
+                    if (!column.getDataType().isSizeCompatible(ptr, value, 
column.getDataType(),
+                            expression.getMaxLength(), expression.getScale(), 
column.getMaxLength(),
+                            column.getScale())) {
+                        throw new 
DataExceedsCapacityException(column.getDataType(), column.getMaxLength(),
+                            column.getScale());
+                    }
+                    column.getDataType().coerceBytes(ptr, value, 
expression.getDataType(), expression.getMaxLength(),
+                        expression.getScale(), 
expression.getSortOrder(),column.getMaxLength(), column.getScale(),
+                        column.getSortOrder(), table.rowKeyOrderOptimizable());
+                    byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                    row.setValue(column, bytes);
+                }
+                List<Cell> flattenedCells = 
Lists.newArrayListWithExpectedSize(estimatedSize);
+                List<Mutation> mutations = row.toRowMutations();
+                for (Mutation source : mutations) {
+                    flattenCells(source, flattenedCells);
+                }
+                tuple.setKeyValues(flattenedCells);
+            }
+            // Repeat only applies to first statement
+            repeat = 1;
+        }
+        
+        List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+        for (int i = 0; i < tuple.size(); i++) {
+            Cell cell = tuple.getValue(i);
+            if (Type.codeToType(cell.getTypeByte()) == Type.Put) {
+                if (put == null) {
+                    put = new Put(rowKey);
+                    transferAttributes(inc, put);
+                    mutations.add(put);
+                }
+                put.add(cell);
+            } else {
+                if (delete == null) {
+                    delete = new Delete(rowKey);
+                    transferAttributes(inc, delete);
+                    mutations.add(delete);
+                }
+                delete.addDeleteMarker(cell);
+            }
+        }
+        return mutations;
+    }
+
+    public static byte[] serializeOnDupKeyIgnore() {
+        return ON_DUP_KEY_IGNORE_BYTES;
+    }
+    
+    /**
+     * Serialize ON DUPLICATE KEY UPDATE info with the following format:
+     * 1) Boolean value tracking whether or not to execute the first ON 
DUPLICATE KEY clause.
+     *    We know the clause should be executed when there are other UPSERT 
VALUES clauses earlier in
+     *    the same batch for this row key. We need this for two main cases: 
+     *       UPSERT VALUES followed by UPSERT VALUES ON DUPLICATE KEY UPDATE
+     *       UPSERT VALUES ON DUPLICATE KEY IGNORE followed by UPSERT VALUES 
ON DUPLICATE KEY UPDATE
+     * 2) Short value tracking how many times the next first clause should be 
executed. This
+     *    optimizes the same clause be executed many times by only serializing 
it once.
+     * 3) Repeating {List<Expression>, PTable} pairs that encapsulate the ON 
DUPLICATE KEY clause.
+     * @param table table representing columns being updated
+     * @param expressions list of expressions to evaluate for updating columns
+     * @return serialized byte array representation of ON DUPLICATE KEY UPDATE 
info
+     */
+    public static byte[] serializeOnDupKeyUpdate(PTable table, 
List<Expression> expressions) {
+        PTableProtos.PTable ptableProto = PTableImpl.toProto(table);
+        int size = ptableProto.getSerializedSize();
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream(size * 
2)) {
+            DataOutputStream output = new DataOutputStream(stream);
+            output.writeBoolean(true); // Skip this ON DUPLICATE KEY clause if 
row already exists
+            output.writeShort(1); // Execute this ON DUPLICATE KEY once
+            WritableUtils.writeVInt(output, expressions.size());
+            for (int i = 0; i < expressions.size(); i++) {
+                Expression expression = expressions.get(i);
+                WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            ptableProto.writeDelimitedTo(output);
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private static byte[] doNotSkipFirstOnDupKey(byte[] oldOnDupKeyBytes) {
+        byte[] newOnDupKeyBytes = Arrays.copyOf(oldOnDupKeyBytes, 
oldOnDupKeyBytes.length);
+        newOnDupKeyBytes[0] = 0; // false means do not skip first ON DUPLICATE 
KEY
+        return newOnDupKeyBytes;
+    }
+
+    public static byte[] combineOnDupKey(byte[] oldOnDupKeyBytes, byte[] 
newOnDupKeyBytes) {
+        // If old ON DUPLICATE KEY is null, then the new value always takes 
effect
+        // If new ON DUPLICATE KEY is null, then reset back to null
+        if (oldOnDupKeyBytes == null || newOnDupKeyBytes == null) {
+            if (newOnDupKeyBytes == null) {
+                return newOnDupKeyBytes;
+            }
+            return doNotSkipFirstOnDupKey(newOnDupKeyBytes);
+        }
+        // If the new UPSERT VALUES statement has an ON DUPLICATE KEY IGNORE, 
and there
+        // is an already existing UPSERT VALUES statement with an ON DUPLICATE 
KEY clause,
+        // then we can just keep that one as the new one has no impact.
+        if (isDupKeyIgnore(newOnDupKeyBytes)) {
+            return oldOnDupKeyBytes;
+        }
+        boolean isOldDupKeyIgnore = isDupKeyIgnore(oldOnDupKeyBytes);
+        try (TrustedByteArrayOutputStream stream = new 
TrustedByteArrayOutputStream(Math.max(0, oldOnDupKeyBytes.length - 
ON_DUP_KEY_HEADER_BYTE_SIZE) + newOnDupKeyBytes.length); 
+                ByteArrayInputStream oldStream = new 
ByteArrayInputStream(oldOnDupKeyBytes); 
+                ByteArrayInputStream newStream = new 
ByteArrayInputStream(newOnDupKeyBytes);
+                DataOutputStream output = new DataOutputStream(stream);
+                DataInputStream oldInput = new DataInputStream(oldStream);
+                DataInputStream newInput = new DataInputStream(newStream)) {
+            
+            boolean execute1 = oldInput.readBoolean();
+            newInput.readBoolean(); // ignore
+            int repeating2 = newInput.readShort();
+            if (isOldDupKeyIgnore) {
+                output.writeBoolean(false); // Will force subsequent ON 
DUPLICATE KEY UPDATE statement to execute
+                output.writeShort(repeating2);
+                output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, 
newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+            } else {
+                int repeating1 = oldInput.readShort();
+                if (Bytes.compareTo(
+                    oldOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, 
oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE, 
+                    newOnDupKeyBytes, Bytes.SIZEOF_SHORT + 
Bytes.SIZEOF_BOOLEAN, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE) == 
0) {
+                // If both old and new ON DUPLICATE KEY UPDATE clauses match,
+                // reduce the size of data we're sending over the wire.
+                // TODO: optimization size of RPC more.
+                output.writeBoolean(execute1);
+                output.writeShort(repeating1 + repeating2);
+                output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, 
newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+                } else {
+                    output.writeBoolean(execute1);
+                    output.writeShort(repeating1); // retain first ON 
DUPLICATE KEY UPDATE having repeated
+                    output.write(oldOnDupKeyBytes, 
ON_DUP_KEY_HEADER_BYTE_SIZE, oldOnDupKeyBytes.length - 
ON_DUP_KEY_HEADER_BYTE_SIZE);
+                    // If the new ON DUPLICATE KEY UPDATE was repeating, we 
need to write it multiple times as only the first
+                    // statement is effected by the repeating amount
+                    for (int i = 0; i < repeating2; i++) {
+                        output.write(newOnDupKeyBytes, 
ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - 
ON_DUP_KEY_HEADER_BYTE_SIZE);
+                    }
+                }
+            }
+            return stream.toByteArray();
+        } catch (IOException e) { // Shouldn't be possible with 
ByteInput/Output streams
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static boolean isDupKeyIgnore(byte[] onDupKeyBytes) {
+        return onDupKeyBytes != null && 
Bytes.compareTo(ON_DUP_KEY_IGNORE_BYTES, onDupKeyBytes) == 0;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 60e32e5..d562f44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -595,8 +595,10 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
     }
 
     private static class ExecutableUpsertStatement extends UpsertStatement 
implements CompilableStatement {
-        private ExecutableUpsertStatement(NamedTableNode table, HintNode 
hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement 
select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
-            super(table, hintNode, columns, values, select, bindCount, 
udfParseNodes);
+        private ExecutableUpsertStatement(NamedTableNode table, HintNode 
hintNode, List<ColumnName> columns,
+                List<ParseNode> values, SelectStatement select, int bindCount, 
Map<String, UDFParseNode> udfParseNodes,
+                List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+            super(table, hintNode, columns, values, select, bindCount, 
udfParseNodes, onDupKeyPairs);
         }
 
         @SuppressWarnings("unchecked")
@@ -1203,8 +1205,9 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         }
 
         @Override
-        public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode 
hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement 
select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
-            return new ExecutableUpsertStatement(table, hintNode, columns, 
values, select, bindCount, udfParseNodes);
+        public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode 
hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement 
select, int bindCount, 
+                Map<String, UDFParseNode> udfParseNodes, 
List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+            return new ExecutableUpsertStatement(table, hintNode, columns, 
values, select, bindCount, udfParseNodes, onDupKeyPairs);
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 6b58bed..977ca4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -706,8 +706,11 @@ public class ParseNodeFactory {
                 orderBy == null ? Collections.<OrderByNode>emptyList() : 
orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects == null ? 
Collections.<SelectStatement>emptyList() : selects, udfParseNodes);
     } 
     
-    public UpsertStatement upsert(NamedTableNode table, HintNode hint, 
List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int 
bindCount, Map<String, UDFParseNode> udfParseNodes) {
-        return new UpsertStatement(table, hint, columns, values, select, 
bindCount, udfParseNodes);
+    public UpsertStatement upsert(NamedTableNode table, HintNode hint, 
List<ColumnName> columns, List<ParseNode> values,
+            SelectStatement select, int bindCount, 
+            Map<String, UDFParseNode> udfParseNodes,
+            List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+        return new UpsertStatement(table, hint, columns, values, select, 
bindCount, udfParseNodes, onDupKeyPairs);
     }
 
     public DeleteStatement delete(NamedTableNode table, HintNode hint, 
ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, 
Map<String, UDFParseNode> udfParseNodes) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index 48698bd..fca7463 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -21,20 +21,24 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.util.Pair;
+
 public class UpsertStatement extends DMLStatement {
     private final List<ColumnName> columns;
     private final List<ParseNode> values;
     private final SelectStatement select;
     private final HintNode hint;
+    private final List<Pair<ColumnName,ParseNode>> onDupKeyPairs;
 
     public UpsertStatement(NamedTableNode table, HintNode hint, 
List<ColumnName> columns,
             List<ParseNode> values, SelectStatement select, int bindCount,
-            Map<String, UDFParseNode> udfParseNodes) {
+            Map<String, UDFParseNode> udfParseNodes, 
List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
         super(table, bindCount, udfParseNodes);
         this.columns = columns == null ? Collections.<ColumnName>emptyList() : 
columns;
         this.values = values;
         this.select = select;
         this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint;
+        this.onDupKeyPairs = onDupKeyPairs;
     }
 
     public List<ColumnName> getColumns() {
@@ -52,4 +56,8 @@ public class UpsertStatement extends DMLStatement {
     public HintNode getHint() {
         return hint;
     }
+
+    public List<Pair<ColumnName,ParseNode>> getOnDupKeyPairs() {
+        return onDupKeyPairs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index 798706e..aca8219 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -89,4 +89,14 @@ public class DelegateColumn extends DelegateDatum implements 
PColumn {
        public boolean isDynamic() {
                return getDelegate().isDynamic();
        }
+
+       @Override
+       public int hashCode() {
+           return getDelegate().hashCode();
+       }
+       
+       @Override
+    public boolean equals(Object o) {
+           return getDelegate().equals(o);
+       }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 3ee012f..7d39dfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -106,13 +106,13 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public PRow newRow(KeyValueBuilder builder, long ts, 
ImmutableBytesWritable key, byte[]... values) {
-        return delegate.newRow(builder, ts, key, values);
+    public PRow newRow(KeyValueBuilder builder, long ts, 
ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) {
+        return delegate.newRow(builder, ts, key, hasOnDupKey, values);
     }
 
     @Override
-    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, 
byte[]... values) {
-        return delegate.newRow(builder, key, values);
+    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, 
boolean hasOnDupKey, byte[]... values) {
+        return delegate.newRow(builder, key, hasOnDupKey, values);
     }
 
     @Override
@@ -280,4 +280,14 @@ public class DelegateTable implements PTable {
     public boolean isAppendOnlySchema() {
         return delegate.isAppendOnlySchema();
     }
+    
+    @Override
+    public int hashCode() {
+        return delegate.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return delegate.equals(obj);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index a556f76..ca827d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -170,14 +170,14 @@ public class PColumnImpl implements PColumn {
     public boolean equals(Object obj) {
         if (this == obj) return true;
         if (obj == null) return false;
-        if (getClass() != obj.getClass()) return false;
-        PColumnImpl other = (PColumnImpl)obj;
+        if (! (obj instanceof PColumn) ) return false;
+        PColumn other = (PColumn)obj;
         if (familyName == null) {
-            if (other.familyName != null) return false;
-        } else if (!familyName.equals(other.familyName)) return false;
+            if (other.getFamilyName() != null) return false;
+        } else if (!familyName.equals(other.getFamilyName())) return false;
         if (name == null) {
-            if (other.name != null) return false;
-        } else if (!name.equals(other.name)) return false;
+            if (other.getName() != null) return false;
+        } else if (!name.equals(other.getName())) return false;
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
index 30deee6..fde83ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
@@ -40,7 +40,7 @@ public interface PRow {
     /**
      * Get the list of {@link org.apache.hadoop.hbase.client.Mutation} used to
      * update an HTable after all mutations through calls to
-     * {@link #setValue(PColumn, Object)} or {@link #delete()}.
+     * {@link #setValue(PColumn, byte[])} or {@link #delete()}.
      * @return the list of mutations representing all changes made to a row
      * @throws ConstraintViolationException if row data violates schema
      * constraint
@@ -54,15 +54,6 @@ public interface PRow {
      * @throws ConstraintViolationException if row data violates schema
      * constraint
      */
-    public void setValue(PColumn col, Object value);
-    
-    /**
-     * Set a column value in the row
-     * @param col the column for which the value is being set
-     * @param value the value
-     * @throws ConstraintViolationException if row data violates schema
-     * constraint
-     */
     public void setValue(PColumn col, byte[] value);
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index b585323..01e8afe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -226,26 +226,28 @@ public interface PTable extends PMetaDataEntity {
      * and the optional key values specified using values.
      * @param ts the timestamp that the key value will have when committed
      * @param key the row key of the key value
+     * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false 
otherwise.
      * @param values the optional key values
      * @return the new row. Use {@link 
org.apache.phoenix.schema.PRow#toRowMutations()} to
      * generate the Row to send to the HBase server.
      * @throws ConstraintViolationException if row data violates schema
      * constraint
      */
-    PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, 
byte[]... values);
+    PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, 
boolean hasOnDupKey, byte[]... values);
 
     /**
      * Creates a new row for the PK values (from {@link 
#newKey(ImmutableBytesWritable, byte[][])}
      * and the optional key values specified using values. The timestamp of 
the key value
      * will be set by the HBase server.
      * @param key the row key of the key value
+     * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false 
otherwise.
      * @param values the optional key values
      * @return the new row. Use {@link 
org.apache.phoenix.schema.PRow#toRowMutations()} to
      * generate the row to send to the HBase server.
      * @throws ConstraintViolationException if row data violates schema
      * constraint
      */
-    PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... 
values);
+    PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean 
hasOnDupKey, byte[]... values);
 
     /**
      * Formulates a row key using the values provided. The values must be in

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 773ce76..627740b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -241,7 +242,7 @@ public class PTableImpl implements PTable {
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
-    public static PTableImpl makePTable(PTable table, List<PColumn> columns) 
throws SQLException {
+    public static PTableImpl makePTable(PTable table, Collection<PColumn> 
columns) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), 
table.getTableName(), table.getType(), table.getIndexState(), 
table.getTimeStamp(),
                 table.getSequenceNumber(), table.getPKName(), 
table.getBucketNum(), columns, table.getParentSchemaName(), 
table.getParentTableName(),
@@ -251,7 +252,7 @@ public class PTableImpl implements PTable {
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
-    public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, List<PColumn> columns) throws SQLException {
+    public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, Collection<PColumn> columns) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), 
table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), 
columns, table.getParentSchemaName(), table.getParentTableName(), 
table.getIndexes(),
@@ -261,7 +262,7 @@ public class PTableImpl implements PTable {
                 table.isNamespaceMapped(), table.getAutoPartitionSeqName(), 
table.isAppendOnlySchema());
     }
 
-    public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws 
SQLException {
+    public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows) throws 
SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), 
table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), 
columns, table.getParentSchemaName(), table.getParentTableName(),
@@ -271,7 +272,7 @@ public class PTableImpl implements PTable {
                 table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
     
-    public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean 
isWalDisabled,
+    public static PTableImpl makePTable(PTable table, long timeStamp, long 
sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean 
isWalDisabled,
             boolean isMultitenant, boolean storeNulls, boolean 
isTransactional, long updateCacheFrequency, boolean isNamespaceMapped) throws 
SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), 
table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
@@ -715,8 +716,8 @@ public class PTableImpl implements PTable {
         }
     }
 
-    private PRow newRow(KeyValueBuilder builder, long ts, 
ImmutableBytesWritable key, int i, byte[]... values) {
-        PRow row = new PRowImpl(builder, key, ts, getBucketNum());
+    private PRow newRow(KeyValueBuilder builder, long ts, 
ImmutableBytesWritable key, int i, boolean hasOnDupKey, byte[]... values) {
+        PRow row = new PRowImpl(builder, key, ts, getBucketNum(), hasOnDupKey);
         if (i < values.length) {
             for (PColumnFamily family : getColumnFamilies()) {
                 for (PColumn column : family.getColumns()) {
@@ -731,13 +732,13 @@ public class PTableImpl implements PTable {
 
     @Override
     public PRow newRow(KeyValueBuilder builder, long ts, 
ImmutableBytesWritable key,
-            byte[]... values) {
-        return newRow(builder, ts, key, 0, values);
+            boolean hasOnDupKey, byte[]... values) {
+        return newRow(builder, ts, key, 0, hasOnDupKey, values);
     }
 
     @Override
-    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, 
byte[]... values) {
-        return newRow(builder, HConstants.LATEST_TIMESTAMP, key, values);
+    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, 
boolean hasOnDupKey, byte[]... values) {
+        return newRow(builder, HConstants.LATEST_TIMESTAMP, key, hasOnDupKey, 
values);
     }
 
     @Override
@@ -775,14 +776,16 @@ public class PTableImpl implements PTable {
         // default to the generic builder, and only override when we know on 
the client
         private final KeyValueBuilder kvBuilder;
 
-        private Put setValues;
+        private Mutation setValues;
         private Delete unsetValues;
         private Mutation deleteRow;
         private final long ts;
+        private final boolean hasOnDupKey;
 
-        public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, 
long ts, Integer bucketNum) {
+        public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, 
long ts, Integer bucketNum, boolean hasOnDupKey) {
             this.kvBuilder = kvBuilder;
             this.ts = ts;
+            this.hasOnDupKey = hasOnDupKey;
             if (bucketNum != null) {
                 this.key = SaltingUtil.getSaltedKey(key, bucketNum);
                 this.keyPtr = new ImmutableBytesPtr(this.key);
@@ -795,7 +798,7 @@ public class PTableImpl implements PTable {
         }
 
         private void newMutations() {
-            Put put = new Put(this.key);
+            Mutation put = this.hasOnDupKey ? new Increment(this.key) : new 
Put(this.key);
             Delete delete = new Delete(this.key);
             if (isWALDisabled()) {
                 put.setDurability(Durability.SKIP_WAL);
@@ -844,12 +847,6 @@ public class PTableImpl implements PTable {
         }
 
         @Override
-        public void setValue(PColumn column, Object value) {
-            byte[] byteValue = value == null ? ByteUtil.EMPTY_BYTE_ARRAY : 
column.getDataType().toBytes(value);
-            setValue(column, byteValue);
-        }
-
-        @Override
         public void setValue(PColumn column, byte[] byteValue) {
             deleteRow = null;
             byte[] family = column.getFamilyName().getBytes();
@@ -864,7 +861,10 @@ public class PTableImpl implements PTable {
                 // Store nulls for immutable tables otherwise default value 
would be used
                 removeIfPresent(setValues, family, qualifier);
                 removeIfPresent(unsetValues, family, qualifier);
-            } else if (isNull && !getStoreNulls() && column.getExpressionStr() 
== null) {
+            } else if (isNull && !getStoreNulls() && !this.hasOnDupKey && 
column.getExpressionStr() == null) {
+                // Cannot use column delete marker when row has ON DUPLICATE 
KEY clause
+                // because we cannot change a Delete mutation to a Put 
mutation in the
+                // case of updates occurring due to the execution of the 
clause.
                 removeIfPresent(setValues, family, qualifier);
                 deleteQuietly(unsetValues, kvBuilder, 
kvBuilder.buildDeleteColumns(keyPtr, column
                             .getFamilyName().getBytesPtr(), 
column.getName().getBytesPtr(), ts));
@@ -1328,11 +1328,11 @@ public class PTableImpl implements PTable {
     public boolean equals(Object obj) {
         if (this == obj) return true;
         if (obj == null) return false;
-        if (getClass() != obj.getClass()) return false;
-        PTableImpl other = (PTableImpl) obj;
+        if (! (obj instanceof PTable)) return false;
+        PTable other = (PTable) obj;
         if (key == null) {
-            if (other.key != null) return false;
-        } else if (!key.equals(other.key)) return false;
+            if (other.getKey() != null) return false;
+        } else if (!key.equals(other.getKey())) return false;
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
index 6f8b19f..65cf075 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
@@ -18,7 +18,6 @@ import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.schema.types.PDataType;
 
 public class ExpressionUtil {
-
        private ExpressionUtil() {
        }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 393da4c..7488c72 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -458,10 +458,6 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
         return plan.getContext().getScan();
     }
     
-    private QueryPlan getQueryPlan(String query) throws SQLException {
-        return getQueryPlan(query, Collections.emptyList());
-    }
-
     private QueryPlan getOptimizedQueryPlan(String query) throws SQLException {
         return getOptimizedQueryPlan(query, Collections.emptyList());
     }
@@ -2683,4 +2679,104 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
             assertEquals("PLATFORM_ENTITY.GLOBAL_INDEX", 
plan.getContext().getCurrentTable().getTable().getName().getString());
         }
     }
+
+    @Test
+    public void testOnDupKeyForImmutableTable() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("CREATE TABLE t1 (k integer not 
null primary key, v bigint) IMMUTABLE_ROWS=true");
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON 
DUPLICATE KEY UPDATE v = v + 1");
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE.getErrorCode(),
 e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testUpdatePKOnDupKey() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("CREATE TABLE t1 (k1 integer not 
null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON 
DUPLICATE KEY UPDATE k2 = v + 1");
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testOnDupKeyTypeMismatch() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("CREATE TABLE t1 (k1 integer not 
null, k2 integer not null, v1 bigint, v2 varchar, constraint pk primary key 
(k1,k2))");
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON 
DUPLICATE KEY UPDATE v1 = v2 || 'a'");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TYPE_MISMATCH.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testDuplicateColumnOnDupKeyUpdate() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("CREATE TABLE t1 (k1 integer not 
null, k2 integer not null, v1 bigint, v2 bigint, constraint pk primary key 
(k1,k2))");
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON 
DUPLICATE KEY UPDATE v1 = v1 + 1, v1 = v2 + 2");
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testAggregationInOnDupKey() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, 
k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+        try {
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON 
DUPLICATE KEY UPDATE v = sum(v)");
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY.getErrorCode(),
 e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSequenceInOnDupKey() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, 
k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+        conn.createStatement().execute("CREATE SEQUENCE s1");
+        try {
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON 
DUPLICATE KEY UPDATE v = next value for s1");
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.INVALID_USE_OF_NEXT_VALUE_FOR.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSCNInOnDupKey() throws Exception {
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + 
"=100";
+        Connection conn = DriverManager.getConnection(url);
+        conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, 
k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+        try {
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON 
DUPLICATE KEY UPDATE v = v + 1");
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
 }

Reply via email to