http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index d121d2d..d6adc71 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -17,20 +17,77 @@ */ package org.apache.phoenix.index; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.coprocessor.generated.PTableProtos; +import org.apache.phoenix.exception.DataExceedsCapacityException; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.ExpressionType; +import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.expression.visitor.ExpressionVisitor; +import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; +import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PRow; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; + +import com.google.common.collect.Lists; /** * Index builder for covered-columns index that ties into phoenix for faster use. */ public class PhoenixIndexBuilder extends NonTxIndexBuilder { + public static final String ATOMIC_OP_ATTRIB = "_ATOMIC_OP_ATTRIB"; + private static final byte[] ON_DUP_KEY_IGNORE_BYTES = new byte[] {1}; // boolean true + private static final int ON_DUP_KEY_HEADER_BYTE_SIZE = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN; + + private static List<Cell> flattenCells(Mutation m, int estimatedSize) throws IOException { + List<Cell> flattenedCells = Lists.newArrayListWithExpectedSize(estimatedSize); + flattenCells(m, flattenedCells); + return flattenedCells; + } + + private static void flattenCells(Mutation m, List<Cell> flattenedCells) throws IOException { + for (List<Cell> cells : m.getFamilyCellMap().values()) { + flattenedCells.addAll(cells); + } + } + @Override public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap()); @@ -53,4 +110,266 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { @Override public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException { } + + @Override + public boolean isAtomicOp(Mutation m) throws IOException { + return m.getAttribute(ATOMIC_OP_ATTRIB) != null; + } + + private static void transferCells(Mutation source, Mutation target) { + target.getFamilyCellMap().putAll(source.getFamilyCellMap()); + } + private static void transferAttributes(Mutation source, Mutation target) { + for (Map.Entry<String, byte[]> entry : source.getAttributesMap().entrySet()) { + target.setAttribute(entry.getKey(), entry.getValue()); + } + } + private static List<Mutation> convertIncrementToPutInSingletonList(Increment inc) { + byte[] rowKey = inc.getRow(); + Put put = new Put(rowKey); + transferCells(inc, put); + transferAttributes(inc, put); + return Collections.<Mutation>singletonList(put); + } + + @Override + public List<Mutation> executeAtomicOp(Increment inc) throws IOException { + byte[] opBytes = inc.getAttribute(ATOMIC_OP_ATTRIB); + if (opBytes == null) { // Unexpected + return null; + } + inc.setAttribute(ATOMIC_OP_ATTRIB, null); + Put put = null; + Delete delete = null; + // We cannot neither use the time stamp in the Increment to set the Get time range + // nor set the Put/Delete time stamp and have this be atomic as HBase does not + // handle that. Though we disallow using ON DUPLICATE KEY clause when the + // CURRENT_SCN is set, we still may have a time stamp set as of when the table + // was resolved on the client side. We need to ignore this as well due to limitations + // in HBase, but this isn't too bad as the time will be very close the the current + // time anyway. + long ts = HConstants.LATEST_TIMESTAMP; + byte[] rowKey = inc.getRow(); + final Get get = new Get(rowKey); + if (isDupKeyIgnore(opBytes)) { + get.setFilter(new FirstKeyOnlyFilter()); + Result result = this.env.getRegion().get(get); + return result.isEmpty() ? convertIncrementToPutInSingletonList(inc) : Collections.<Mutation>emptyList(); + } + ByteArrayInputStream stream = new ByteArrayInputStream(opBytes); + DataInputStream input = new DataInputStream(stream); + boolean skipFirstOp = input.readBoolean(); + short repeat = input.readShort(); + final int[] estimatedSizeHolder = {0}; + List<Pair<PTable, List<Expression>>> operations = Lists.newArrayListWithExpectedSize(3); + while (true) { + ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { + @Override + public Void visit(KeyValueColumnExpression expression) { + get.addColumn(expression.getColumnFamily(), expression.getColumnName()); + estimatedSizeHolder[0]++; + return null; + } + }; + try { + int nExpressions = WritableUtils.readVInt(input); + List<Expression>expressions = Lists.newArrayListWithExpectedSize(nExpressions); + for (int i = 0; i < nExpressions; i++) { + Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); + expression.readFields(input); + expressions.add(expression); + expression.accept(visitor); + } + PTableProtos.PTable tableProto = PTableProtos.PTable.parseDelimitedFrom(input); + PTable table = PTableImpl.createFromProto(tableProto); + operations.add(new Pair<>(table, expressions)); + } catch (EOFException e) { + break; + } + } + int estimatedSize = estimatedSizeHolder[0]; + if (get.getFamilyMap().isEmpty()) { + get.setFilter(new FirstKeyOnlyFilter()); + } + MultiKeyValueTuple tuple; + List<Cell>cells = ((HRegion)this.env.getRegion()).get(get, false); + if (cells.isEmpty()) { + if (skipFirstOp) { + if (operations.size() <= 1 && repeat <= 1) { + return convertIncrementToPutInSingletonList(inc); + } + repeat--; // Skip first operation (if first wasn't ON DUPLICATE KEY IGNORE) + } + // Base current state off of new row + tuple = new MultiKeyValueTuple(flattenCells(inc, estimatedSize)); + } else { + // Base current state off of existing row + tuple = new MultiKeyValueTuple(cells); + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + for (int opIndex = 0; opIndex < operations.size(); opIndex++) { + Pair<PTable, List<Expression>> operation = operations.get(opIndex); + PTable table = operation.getFirst(); + List<Expression> expressions = operation.getSecond(); + for (int j = 0; j < repeat; j++) { // repeater loop + ptr.set(rowKey); + PRow row = table.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false); + for (int i = 0; i < expressions.size(); i++) { + Expression expression = expressions.get(i); + ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); + expression.evaluate(tuple, ptr); + PColumn column = table.getColumns().get(i + 1); + Object value = expression.getDataType().toObject(ptr, column.getSortOrder()); + // We are guaranteed that the two column will have the + // same type. + if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), + expression.getMaxLength(), expression.getScale(), column.getMaxLength(), + column.getScale())) { + throw new DataExceedsCapacityException(column.getDataType(), column.getMaxLength(), + column.getScale()); + } + column.getDataType().coerceBytes(ptr, value, expression.getDataType(), expression.getMaxLength(), + expression.getScale(), expression.getSortOrder(),column.getMaxLength(), column.getScale(), + column.getSortOrder(), table.rowKeyOrderOptimizable()); + byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr); + row.setValue(column, bytes); + } + List<Cell> flattenedCells = Lists.newArrayListWithExpectedSize(estimatedSize); + List<Mutation> mutations = row.toRowMutations(); + for (Mutation source : mutations) { + flattenCells(source, flattenedCells); + } + tuple.setKeyValues(flattenedCells); + } + // Repeat only applies to first statement + repeat = 1; + } + + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2); + for (int i = 0; i < tuple.size(); i++) { + Cell cell = tuple.getValue(i); + if (Type.codeToType(cell.getTypeByte()) == Type.Put) { + if (put == null) { + put = new Put(rowKey); + transferAttributes(inc, put); + mutations.add(put); + } + put.add(cell); + } else { + if (delete == null) { + delete = new Delete(rowKey); + transferAttributes(inc, delete); + mutations.add(delete); + } + delete.addDeleteMarker(cell); + } + } + return mutations; + } + + public static byte[] serializeOnDupKeyIgnore() { + return ON_DUP_KEY_IGNORE_BYTES; + } + + /** + * Serialize ON DUPLICATE KEY UPDATE info with the following format: + * 1) Boolean value tracking whether or not to execute the first ON DUPLICATE KEY clause. + * We know the clause should be executed when there are other UPSERT VALUES clauses earlier in + * the same batch for this row key. We need this for two main cases: + * UPSERT VALUES followed by UPSERT VALUES ON DUPLICATE KEY UPDATE + * UPSERT VALUES ON DUPLICATE KEY IGNORE followed by UPSERT VALUES ON DUPLICATE KEY UPDATE + * 2) Short value tracking how many times the next first clause should be executed. This + * optimizes the same clause be executed many times by only serializing it once. + * 3) Repeating {List<Expression>, PTable} pairs that encapsulate the ON DUPLICATE KEY clause. + * @param table table representing columns being updated + * @param expressions list of expressions to evaluate for updating columns + * @return serialized byte array representation of ON DUPLICATE KEY UPDATE info + */ + public static byte[] serializeOnDupKeyUpdate(PTable table, List<Expression> expressions) { + PTableProtos.PTable ptableProto = PTableImpl.toProto(table); + int size = ptableProto.getSerializedSize(); + try (ByteArrayOutputStream stream = new ByteArrayOutputStream(size * 2)) { + DataOutputStream output = new DataOutputStream(stream); + output.writeBoolean(true); // Skip this ON DUPLICATE KEY clause if row already exists + output.writeShort(1); // Execute this ON DUPLICATE KEY once + WritableUtils.writeVInt(output, expressions.size()); + for (int i = 0; i < expressions.size(); i++) { + Expression expression = expressions.get(i); + WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); + expression.write(output); + } + ptableProto.writeDelimitedTo(output); + return stream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static byte[] doNotSkipFirstOnDupKey(byte[] oldOnDupKeyBytes) { + byte[] newOnDupKeyBytes = Arrays.copyOf(oldOnDupKeyBytes, oldOnDupKeyBytes.length); + newOnDupKeyBytes[0] = 0; // false means do not skip first ON DUPLICATE KEY + return newOnDupKeyBytes; + } + + public static byte[] combineOnDupKey(byte[] oldOnDupKeyBytes, byte[] newOnDupKeyBytes) { + // If old ON DUPLICATE KEY is null, then the new value always takes effect + // If new ON DUPLICATE KEY is null, then reset back to null + if (oldOnDupKeyBytes == null || newOnDupKeyBytes == null) { + if (newOnDupKeyBytes == null) { + return newOnDupKeyBytes; + } + return doNotSkipFirstOnDupKey(newOnDupKeyBytes); + } + // If the new UPSERT VALUES statement has an ON DUPLICATE KEY IGNORE, and there + // is an already existing UPSERT VALUES statement with an ON DUPLICATE KEY clause, + // then we can just keep that one as the new one has no impact. + if (isDupKeyIgnore(newOnDupKeyBytes)) { + return oldOnDupKeyBytes; + } + boolean isOldDupKeyIgnore = isDupKeyIgnore(oldOnDupKeyBytes); + try (TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(Math.max(0, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE) + newOnDupKeyBytes.length); + ByteArrayInputStream oldStream = new ByteArrayInputStream(oldOnDupKeyBytes); + ByteArrayInputStream newStream = new ByteArrayInputStream(newOnDupKeyBytes); + DataOutputStream output = new DataOutputStream(stream); + DataInputStream oldInput = new DataInputStream(oldStream); + DataInputStream newInput = new DataInputStream(newStream)) { + + boolean execute1 = oldInput.readBoolean(); + newInput.readBoolean(); // ignore + int repeating2 = newInput.readShort(); + if (isOldDupKeyIgnore) { + output.writeBoolean(false); // Will force subsequent ON DUPLICATE KEY UPDATE statement to execute + output.writeShort(repeating2); + output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE); + } else { + int repeating1 = oldInput.readShort(); + if (Bytes.compareTo( + oldOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE, + newOnDupKeyBytes, Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE) == 0) { + // If both old and new ON DUPLICATE KEY UPDATE clauses match, + // reduce the size of data we're sending over the wire. + // TODO: optimization size of RPC more. + output.writeBoolean(execute1); + output.writeShort(repeating1 + repeating2); + output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE); + } else { + output.writeBoolean(execute1); + output.writeShort(repeating1); // retain first ON DUPLICATE KEY UPDATE having repeated + output.write(oldOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE); + // If the new ON DUPLICATE KEY UPDATE was repeating, we need to write it multiple times as only the first + // statement is effected by the repeating amount + for (int i = 0; i < repeating2; i++) { + output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE); + } + } + } + return stream.toByteArray(); + } catch (IOException e) { // Shouldn't be possible with ByteInput/Output streams + throw new RuntimeException(e); + } + } + + public static boolean isDupKeyIgnore(byte[] onDupKeyBytes) { + return onDupKeyBytes != null && Bytes.compareTo(ON_DUP_KEY_IGNORE_BYTES, onDupKeyBytes) == 0; + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 60e32e5..d562f44 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -595,8 +595,10 @@ public class PhoenixStatement implements Statement, SQLCloseable { } private static class ExecutableUpsertStatement extends UpsertStatement implements CompilableStatement { - private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) { - super(table, hintNode, columns, values, select, bindCount, udfParseNodes); + private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, + List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes, + List<Pair<ColumnName,ParseNode>> onDupKeyPairs) { + super(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs); } @SuppressWarnings("unchecked") @@ -1203,8 +1205,9 @@ public class PhoenixStatement implements Statement, SQLCloseable { } @Override - public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) { - return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes); + public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, + Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) { + return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 6b58bed..977ca4c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -706,8 +706,11 @@ public class ParseNodeFactory { orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects, udfParseNodes); } - public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) { - return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes); + public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, + SelectStatement select, int bindCount, + Map<String, UDFParseNode> udfParseNodes, + List<Pair<ColumnName,ParseNode>> onDupKeyPairs) { + return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs); } public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java index 48698bd..fca7463 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java @@ -21,20 +21,24 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.util.Pair; + public class UpsertStatement extends DMLStatement { private final List<ColumnName> columns; private final List<ParseNode> values; private final SelectStatement select; private final HintNode hint; + private final List<Pair<ColumnName,ParseNode>> onDupKeyPairs; public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, - Map<String, UDFParseNode> udfParseNodes) { + Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) { super(table, bindCount, udfParseNodes); this.columns = columns == null ? Collections.<ColumnName>emptyList() : columns; this.values = values; this.select = select; this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint; + this.onDupKeyPairs = onDupKeyPairs; } public List<ColumnName> getColumns() { @@ -52,4 +56,8 @@ public class UpsertStatement extends DMLStatement { public HintNode getHint() { return hint; } + + public List<Pair<ColumnName,ParseNode>> getOnDupKeyPairs() { + return onDupKeyPairs; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java index 798706e..aca8219 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java @@ -89,4 +89,14 @@ public class DelegateColumn extends DelegateDatum implements PColumn { public boolean isDynamic() { return getDelegate().isDynamic(); } + + @Override + public int hashCode() { + return getDelegate().hashCode(); + } + + @Override + public boolean equals(Object o) { + return getDelegate().equals(o); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index 3ee012f..7d39dfe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -106,13 +106,13 @@ public class DelegateTable implements PTable { } @Override - public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, byte[]... values) { - return delegate.newRow(builder, ts, key, values); + public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) { + return delegate.newRow(builder, ts, key, hasOnDupKey, values); } @Override - public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values) { - return delegate.newRow(builder, key, values); + public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) { + return delegate.newRow(builder, key, hasOnDupKey, values); } @Override @@ -280,4 +280,14 @@ public class DelegateTable implements PTable { public boolean isAppendOnlySchema() { return delegate.isAppendOnlySchema(); } + + @Override + public int hashCode() { + return delegate.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return delegate.equals(obj); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java index a556f76..ca827d8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java @@ -170,14 +170,14 @@ public class PColumnImpl implements PColumn { public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; - if (getClass() != obj.getClass()) return false; - PColumnImpl other = (PColumnImpl)obj; + if (! (obj instanceof PColumn) ) return false; + PColumn other = (PColumn)obj; if (familyName == null) { - if (other.familyName != null) return false; - } else if (!familyName.equals(other.familyName)) return false; + if (other.getFamilyName() != null) return false; + } else if (!familyName.equals(other.getFamilyName())) return false; if (name == null) { - if (other.name != null) return false; - } else if (!name.equals(other.name)) return false; + if (other.getName() != null) return false; + } else if (!name.equals(other.getName())) return false; return true; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java index 30deee6..fde83ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java @@ -40,7 +40,7 @@ public interface PRow { /** * Get the list of {@link org.apache.hadoop.hbase.client.Mutation} used to * update an HTable after all mutations through calls to - * {@link #setValue(PColumn, Object)} or {@link #delete()}. + * {@link #setValue(PColumn, byte[])} or {@link #delete()}. * @return the list of mutations representing all changes made to a row * @throws ConstraintViolationException if row data violates schema * constraint @@ -54,15 +54,6 @@ public interface PRow { * @throws ConstraintViolationException if row data violates schema * constraint */ - public void setValue(PColumn col, Object value); - - /** - * Set a column value in the row - * @param col the column for which the value is being set - * @param value the value - * @throws ConstraintViolationException if row data violates schema - * constraint - */ public void setValue(PColumn col, byte[] value); /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index b585323..01e8afe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -226,26 +226,28 @@ public interface PTable extends PMetaDataEntity { * and the optional key values specified using values. * @param ts the timestamp that the key value will have when committed * @param key the row key of the key value + * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise. * @param values the optional key values * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to * generate the Row to send to the HBase server. * @throws ConstraintViolationException if row data violates schema * constraint */ - PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, byte[]... values); + PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values); /** * Creates a new row for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])} * and the optional key values specified using values. The timestamp of the key value * will be set by the HBase server. * @param key the row key of the key value + * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise. * @param values the optional key values * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to * generate the row to send to the HBase server. * @throws ConstraintViolationException if row data violates schema * constraint */ - PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values); + PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values); /** * Formulates a row key using the values provided. The values must be in http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 773ce76..627740b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -241,7 +242,7 @@ public class PTableImpl implements PTable { table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); } - public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException { + public static PTableImpl makePTable(PTable table, Collection<PColumn> columns) throws SQLException { return new PTableImpl( table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), @@ -251,7 +252,7 @@ public class PTableImpl implements PTable { table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); } - public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException { + public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException { return new PTableImpl( table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), @@ -261,7 +262,7 @@ public class PTableImpl implements PTable { table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); } - public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException { + public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows) throws SQLException { return new PTableImpl( table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), @@ -271,7 +272,7 @@ public class PTableImpl implements PTable { table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); } - public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, + public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException { return new PTableImpl( table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, @@ -715,8 +716,8 @@ public class PTableImpl implements PTable { } } - private PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, int i, byte[]... values) { - PRow row = new PRowImpl(builder, key, ts, getBucketNum()); + private PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, int i, boolean hasOnDupKey, byte[]... values) { + PRow row = new PRowImpl(builder, key, ts, getBucketNum(), hasOnDupKey); if (i < values.length) { for (PColumnFamily family : getColumnFamilies()) { for (PColumn column : family.getColumns()) { @@ -731,13 +732,13 @@ public class PTableImpl implements PTable { @Override public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, - byte[]... values) { - return newRow(builder, ts, key, 0, values); + boolean hasOnDupKey, byte[]... values) { + return newRow(builder, ts, key, 0, hasOnDupKey, values); } @Override - public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values) { - return newRow(builder, HConstants.LATEST_TIMESTAMP, key, values); + public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) { + return newRow(builder, HConstants.LATEST_TIMESTAMP, key, hasOnDupKey, values); } @Override @@ -775,14 +776,16 @@ public class PTableImpl implements PTable { // default to the generic builder, and only override when we know on the client private final KeyValueBuilder kvBuilder; - private Put setValues; + private Mutation setValues; private Delete unsetValues; private Mutation deleteRow; private final long ts; + private final boolean hasOnDupKey; - public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum) { + public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum, boolean hasOnDupKey) { this.kvBuilder = kvBuilder; this.ts = ts; + this.hasOnDupKey = hasOnDupKey; if (bucketNum != null) { this.key = SaltingUtil.getSaltedKey(key, bucketNum); this.keyPtr = new ImmutableBytesPtr(this.key); @@ -795,7 +798,7 @@ public class PTableImpl implements PTable { } private void newMutations() { - Put put = new Put(this.key); + Mutation put = this.hasOnDupKey ? new Increment(this.key) : new Put(this.key); Delete delete = new Delete(this.key); if (isWALDisabled()) { put.setDurability(Durability.SKIP_WAL); @@ -844,12 +847,6 @@ public class PTableImpl implements PTable { } @Override - public void setValue(PColumn column, Object value) { - byte[] byteValue = value == null ? ByteUtil.EMPTY_BYTE_ARRAY : column.getDataType().toBytes(value); - setValue(column, byteValue); - } - - @Override public void setValue(PColumn column, byte[] byteValue) { deleteRow = null; byte[] family = column.getFamilyName().getBytes(); @@ -864,7 +861,10 @@ public class PTableImpl implements PTable { // Store nulls for immutable tables otherwise default value would be used removeIfPresent(setValues, family, qualifier); removeIfPresent(unsetValues, family, qualifier); - } else if (isNull && !getStoreNulls() && column.getExpressionStr() == null) { + } else if (isNull && !getStoreNulls() && !this.hasOnDupKey && column.getExpressionStr() == null) { + // Cannot use column delete marker when row has ON DUPLICATE KEY clause + // because we cannot change a Delete mutation to a Put mutation in the + // case of updates occurring due to the execution of the clause. removeIfPresent(setValues, family, qualifier); deleteQuietly(unsetValues, kvBuilder, kvBuilder.buildDeleteColumns(keyPtr, column .getFamilyName().getBytesPtr(), column.getName().getBytesPtr(), ts)); @@ -1328,11 +1328,11 @@ public class PTableImpl implements PTable { public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; - if (getClass() != obj.getClass()) return false; - PTableImpl other = (PTableImpl) obj; + if (! (obj instanceof PTable)) return false; + PTable other = (PTable) obj; if (key == null) { - if (other.key != null) return false; - } else if (!key.equals(other.key)) return false; + if (other.getKey() != null) return false; + } else if (!key.equals(other.getKey())) return false; return true; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java index 6f8b19f..65cf075 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java @@ -18,7 +18,6 @@ import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.schema.types.PDataType; public class ExpressionUtil { - private ExpressionUtil() { } http://git-wip-us.apache.org/repos/asf/phoenix/blob/927c6120/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 393da4c..7488c72 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -458,10 +458,6 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { return plan.getContext().getScan(); } - private QueryPlan getQueryPlan(String query) throws SQLException { - return getQueryPlan(query, Collections.emptyList()); - } - private QueryPlan getOptimizedQueryPlan(String query) throws SQLException { return getOptimizedQueryPlan(query, Collections.emptyList()); } @@ -2683,4 +2679,104 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { assertEquals("PLATFORM_ENTITY.GLOBAL_INDEX", plan.getContext().getCurrentTable().getTable().getName().getString()); } } + + @Test + public void testOnDupKeyForImmutableTable() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + try { + conn.createStatement().execute("CREATE TABLE t1 (k integer not null primary key, v bigint) IMMUTABLE_ROWS=true"); + conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE.getErrorCode(), e.getErrorCode()); + } finally { + conn.close(); + } + } + + @Test + public void testUpdatePKOnDupKey() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + try { + conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))"); + conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE k2 = v + 1"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY.getErrorCode(), e.getErrorCode()); + } finally { + conn.close(); + } + } + + @Test + public void testOnDupKeyTypeMismatch() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + try { + conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v1 bigint, v2 varchar, constraint pk primary key (k1,k2))"); + conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v1 = v2 || 'a'"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.TYPE_MISMATCH.getErrorCode(), e.getErrorCode()); + } finally { + conn.close(); + } + } + + @Test + public void testDuplicateColumnOnDupKeyUpdate() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + try { + conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v1 bigint, v2 bigint, constraint pk primary key (k1,k2))"); + conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v1 = v1 + 1, v1 = v2 + 2"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode()); + } finally { + conn.close(); + } + } + + @Test + public void testAggregationInOnDupKey() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))"); + try { + conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = sum(v)"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode()); + } finally { + conn.close(); + } + } + + @Test + public void testSequenceInOnDupKey() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))"); + conn.createStatement().execute("CREATE SEQUENCE s1"); + try { + conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = next value for s1"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.INVALID_USE_OF_NEXT_VALUE_FOR.getErrorCode(), e.getErrorCode()); + } finally { + conn.close(); + } + } + + @Test + public void testSCNInOnDupKey() throws Exception { + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=100"; + Connection conn = DriverManager.getConnection(url); + conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))"); + try { + conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode()); + } finally { + conn.close(); + } + } }