Repository: phoenix Updated Branches: refs/heads/txn acacaf340 -> d81e660e3
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java index f94ce39..8a4f376 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java @@ -41,33 +41,23 @@ public class TxPointInTimeQueryIT extends BaseClientManagedTimeIT { public void initTable() throws Exception { ts = nextTimestamp(); } - + @Test public void testQueryWithSCN() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - conn.createStatement() - .execute( - "CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true"); - - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10)); - conn = DriverManager.getConnection(getUrl(), props); - - String selectQuery = "SELECT k FROM t"; + try (Connection conn = DriverManager.getConnection(getUrl(), props);) { try { - conn.createStatement().executeQuery(selectQuery); + conn.createStatement() + .execute( + "CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true"); fail(); } catch (SQLException e) { assertEquals("Unexpected Exception", - SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET.getErrorCode(), - e.getErrorCode()); + SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET + .getErrorCode(), e.getErrorCode()); } - - } finally { - conn.close(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index db98e25..855915d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -347,7 +347,7 @@ public class FromCompiler { PTable theTable = null; if (updateCacheImmediately || connection.getAutoCommit()) { MetaDataMutationResult result = client.updateCache(schemaName, tableName); - timeStamp = TransactionUtil.getTableTimestamp(connection, result); + timeStamp = TransactionUtil.getResolvedTimestamp(connection, result); theTable = result.getTable(); if (theTable == null) { throw new TableNotFoundException(schemaName, tableName, timeStamp); @@ -367,7 +367,7 @@ public class FromCompiler { if (theTable == null) { MetaDataMutationResult result = client.updateCache(schemaName, tableName); if (result.wasUpdated()) { - timeStamp = TransactionUtil.getTableTimestamp(connection, result); + timeStamp = TransactionUtil.getResolvedTimestamp(connection, result); theTable = result.getTable(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 5d24f7e..ee724fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -774,7 +774,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // without making an additional query PTable table = loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP); - if (table != null) { + if (table != null && !isTableDeleted(table)) { if (table.getTimeStamp() < clientTimeStamp) { // If the table is older than the client time stamp and it's deleted, // continue @@ -803,7 +803,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // on the system table. This is an issue because of the way we manage batch mutation // in the // Indexer. - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); + region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); // Invalidate the cache - the next getTable call will add it // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index d89e19a..db50f83 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -250,7 +250,7 @@ public enum SQLExceptionCode { DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1056, "43A13", "Default column family not allowed on VIEW or shared INDEX"), ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional"), MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL(1071, "44A02", "An existing HBase table may not be mapped to as a transactional table"), - STORE_NULLS_MUST_BE_FALSE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"), + STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"), CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a transaction on a connection with SCN set"), /** Sequence related */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 43fe28b..79da59f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import co.cask.tephra.Transaction; @@ -196,13 +197,18 @@ public class MutationState implements SQLCloseable { } if (hasUncommittedData) { try { - tx = currentTx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); + if (txContext == null) { + tx = currentTx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); + } else { + txContext.checkpoint(); + tx = currentTx = txContext.getCurrentTransaction(); + } // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards // should see all this data. uncommittedPhysicalNames.clear(); - } catch (TransactionNotInProgressException e) { + } catch (TransactionFailureException | TransactionNotInProgressException e) { throw new SQLException(e); - } + } } // Since we're querying our own table while mutating it, we must exclude // see our current mutations, otherwise we can get erroneous results (for DELETE) @@ -367,37 +373,14 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } - private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) { + private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : Iterators.<PTable>emptyIterator(); - final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size()); + final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; - Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator(); - final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - while (iterator.hasNext()) { - Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next(); - ImmutableBytesPtr key = rowEntry.getKey(); - PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key); - List<Mutation> rowMutations, rowMutationsPertainingToIndex; - if (rowEntry.getValue() == PRow.DELETE_MARKER) { // means delete - row.delete(); - rowMutations = row.toRowMutations(); - // Row deletes for index tables are processed by running a re-written query - // against the index table (as this allows for flexibility in being able to - // delete rows). - rowMutationsPertainingToIndex = Collections.emptyList(); - } else { - for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) { - row.setValue(valueEntry.getKey(), valueEntry.getValue()); - } - rowMutations = row.toRowMutations(); - rowMutationsPertainingToIndex = rowMutations; - } - mutations.addAll(rowMutations); - if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex); - } + generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex); return new Iterator<Pair<byte[],List<Mutation>>>() { boolean isFirst = true; @@ -410,14 +393,24 @@ public class MutationState implements SQLCloseable { public Pair<byte[], List<Mutation>> next() { if (isFirst) { isFirst = false; - return new Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutations); + return new Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutationList); } PTable index = indexes.next(); List<Mutation> indexMutations; try { indexMutations = IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex, - ptr, connection.getKeyValueBuilder(), connection); + connection.getKeyValueBuilder(), connection); + // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map + if (!sendAll) { + TableRef key = new TableRef(index); + Map<ImmutableBytesPtr, Map<PColumn, byte[]>> rowToColumnMap = mutations.remove(key); + if (rowToColumnMap!=null) { + final List<Mutation> deleteMutations = Lists.newArrayList(); + generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null); + indexMutations.addAll(deleteMutations); + } + } } catch (SQLException e) { throw new IllegalDataException(e); } @@ -431,6 +424,35 @@ public class MutationState implements SQLCloseable { }; } + + private void generateMutations(final TableRef tableRef, long timestamp, + final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, + final List<Mutation> mutationList, + final List<Mutation> mutationsPertainingToIndex) { + Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next(); + ImmutableBytesPtr key = rowEntry.getKey(); + PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key); + List<Mutation> rowMutations, rowMutationsPertainingToIndex; + if (rowEntry.getValue() == PRow.DELETE_MARKER) { // means delete + row.delete(); + rowMutations = row.toRowMutations(); + // Row deletes for index tables are processed by running a re-written query + // against the index table (as this allows for flexibility in being able to + // delete rows). + rowMutationsPertainingToIndex = Collections.emptyList(); + } else { + for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) { + row.setValue(valueEntry.getKey(), valueEntry.getValue()); + } + rowMutations = row.toRowMutations(); + rowMutationsPertainingToIndex = rowMutations; + } + mutationList.addAll(rowMutations); + if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex); + } + } /** * Get the unsorted list of HBase mutations for the tables with uncommitted data. @@ -453,7 +475,7 @@ public class MutationState implements SQLCloseable { private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); private Iterator<Pair<byte[],List<Mutation>>> init() { - return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes); + return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes, true); } @Override @@ -651,7 +673,7 @@ public class MutationState implements SQLCloseable { boolean isDataTable = true; // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; - Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false); + Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); while (mutationsIterator.hasNext()) { Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); byte[] htableName = pair.getFirst(); @@ -855,13 +877,17 @@ public class MutationState implements SQLCloseable { } public void commit() throws SQLException { + boolean sendMutationsFailed=false; try { send(); + } catch (Throwable t) { + sendMutationsFailed=true; + throw t; } finally { txAwares.clear(); if (txContext != null) { try { - if (txStarted) { + if (txStarted && !sendMutationsFailed) { txContext.finish(); } } catch (TransactionFailureException e) { @@ -872,8 +898,10 @@ public class MutationState implements SQLCloseable { throw TransactionUtil.getSQLException(e); } } finally { - reset(); - } + if (!sendMutationsFailed) { + reset(); + } + } } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index b0f87cb..bd0461d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -34,6 +34,7 @@ import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Delete; @@ -83,6 +84,8 @@ import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; +import co.cask.tephra.TxConstants; + import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -822,12 +825,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int nDeleteCF = 0; int nDeleteVersionCF = 0; for (Cell kv : pendingUpdates) { - if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { - nDeleteCF++; - } - else if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { + if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { nDeleteVersionCF++; } + else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() + // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor + // TODO see if implement PhoenixTransactionalIndexer.preDelete will work instead of the following check + || (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { + nDeleteCF++; + } } // This is what a delete looks like on the server side for mutable indexing... // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not @@ -850,7 +856,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { Cell newValue = newState.get(ref); if (newValue != null) { // Indexed column has potentially changed ImmutableBytesWritable oldValue = oldState.getLatestValue(ref); - boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode()); + boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY)); //If the new column value has to be set as null and the older value is null too, //then just skip to the next indexed column. if (newValueSetAsNull && oldValue == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 14f8a1f..2719119 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -86,13 +86,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec { ptr.set(state.getCurrentRowKey()); List<IndexUpdate> indexUpdates = Lists.newArrayList(); for (IndexMaintainer maintainer : indexMaintainers) { - // Check both immutable and local, as for transactional tables, we use an index maintainer + // For transactional tables, we use an index maintainer // to aid in rollback if there's a KeyValue column in the index. The alternative would be // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the // client side. - if (maintainer.isImmutableRows() && maintainer.isLocalIndex()) { - continue; - } Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns()); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 3d5da43..9f03747 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import co.cask.tephra.Transaction; @@ -28,11 +29,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -137,12 +141,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Map<String,byte[]> updateAttributes = m.getAttributesMap(); PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); - if (m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null) { - // Unless we're aborting the transaction, we do not want to see our own transaction writes, - // since index maintenance requires seeing the previously committed data in order to function - // properly. - indexMetaData.getTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); - } + byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY); Collection<Pair<Mutation, byte[]>> indexUpdates = null; // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { @@ -152,7 +151,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } // get the index updates for all elements in this batch - indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp)); + indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute); current.addTimelineAnnotation("Built index updates, doing preStep"); TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); @@ -168,10 +167,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } } - private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator) throws IOException { - ResultScanner scanner = null; + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException { + ResultScanner currentScanner = null; + ResultScanner previousScanner = null; TransactionAwareHTable txTable = null; - // Collect up all mutations in batch Map<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); @@ -196,13 +195,11 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10); for (IndexMaintainer indexMaintainer : indexMaintainers) { - // Check both immutable and local, as for transactional tables, we use an index maintainer + // For transactional tables, we use an index maintainer // to aid in rollback if there's a KeyValue column in the index. The alternative would be // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the // client side. - if (!indexMaintainer.isImmutableRows() || !indexMaintainer.isLocalIndex()) { mutableColumns.addAll(indexMaintainer.getAllColumns()); - } } Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size()); @@ -226,39 +223,23 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { HTableInterface htable = env.getTable(tableName); txTable = new TransactionAwareHTable(htable); txTable.startTx(tx); - scanner = txTable.getScanner(scan); - } - ColumnReference emptyColRef = new ColumnReference(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); - if (scanner != null) { - Result result; - while ((result = scanner.next()) != null) { - Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); - byte[] attribValue = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY); - TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result); - Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); - for (IndexUpdate delete : deletes) { - if (delete.isValid()) { - delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); - indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); - } - } - state.applyMutation(); - Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); - for (IndexUpdate put : puts) { - if (put.isValid()) { - indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName())); - } - } + currentScanner = txTable.getScanner(scan); + if (txRollbackAttribute!=null) { + tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + previousScanner = txTable.getScanner(scan); } } + // In case of rollback we have to do two scans, one with VisibilityLevel.SNAPSHOT to see the current state of the row + // and another with VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT to see the previous state of the row + // so that we can rollback a previous delete + put + processScanner(env, indexMetaData, txRollbackAttribute, previousScanner, mutations, tx, mutableColumns, indexUpdates, false); + processScanner(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates, true); for (Mutation m : mutations.values()) { - TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); - state.applyMutation(); - Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); - for (IndexUpdate put : puts) { - if (put.isValid()) { - indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName())); - } + long timestamp = getTimestamp(txRollbackAttribute, tx.getWritePointer(), m); + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), timestamp, m); + // if we did not generate valid put, we might have to generate a delete + if (!generatePuts(indexMetaData, indexUpdates, state)) { + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); } } } finally { @@ -268,6 +249,68 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return indexUpdates; } + private void processScanner(RegionCoprocessorEnvironment env, + PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, + ResultScanner scanner, + Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx, + Set<ColumnReference> mutableColumns, + Collection<Pair<Mutation, byte[]>> indexUpdates, boolean removeMutation) throws IOException { + if (scanner != null) { + Result result; + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + while ((result = scanner.next()) != null) { + Mutation m = removeMutation ? mutations.remove(new ImmutableBytesPtr(result.getRow())) : mutations.get(new ImmutableBytesPtr(result.getRow())); + long timestamp = getTimestamp(txRollbackAttribute, tx.getWritePointer(), m); + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), timestamp, m, emptyColRef, result); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + generatePuts(indexMetaData, indexUpdates, state); + } + } + } + + private long getTimestamp(byte[] txRollbackAttribute, long txnWritePointer, Mutation m) { + if (txRollbackAttribute==null) { + return txnWritePointer; + } + // if this is a rollback generate mutations with the same timestamp as the data row mutation as the timestamp might be + // different from the current txn write pointer because of check points + long mutationTimestamp = txnWritePointer; + for (Entry<byte[], List<Cell>> entry : m.getFamilyCellMap().entrySet()) { + mutationTimestamp = entry.getValue().get(0).getTimestamp(); + break; + } + return mutationTimestamp; + } + + private void generateDeletes(PhoenixIndexMetaData indexMetaData, + Collection<Pair<Mutation, byte[]>> indexUpdates, + byte[] attribValue, TxTableState state) throws IOException { + Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); + for (IndexUpdate delete : deletes) { + if (delete.isValid()) { + delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); + indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); + } + } + } + + boolean generatePuts( + PhoenixIndexMetaData indexMetaData, + Collection<Pair<Mutation, byte[]>> indexUpdates, + TxTableState state) + throws IOException { + state.applyMutation(); + Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); + boolean validPut = false; + for (IndexUpdate put : puts) { + if (put.isValid()) { + indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName())); + validPut = true; + } + } + return validPut; + } + private static class TxTableState implements TableState { private final Mutation mutation; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index a073b84..35a8663 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -215,7 +215,8 @@ public class QueryOptimizer { private static QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException { int nColumns = dataPlan.getProjector().getColumnCount(); - String alias = '"' + dataPlan.getTableRef().getTableAlias() + '"'; // double quote in case it's case sensitive + String tableAlias = dataPlan.getTableRef().getTableAlias(); + String alias = tableAlias==null ? null : '"' + tableAlias + '"'; // double quote in case it's case sensitive String schemaName = index.getParentSchemaName().getString(); schemaName = schemaName.length() == 0 ? null : '"' + schemaName + '"'; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 5667029..b2982e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1228,7 +1228,7 @@ public class MetaDataClient { Long timestamp = null; if (parent != null) { transactional = parent.isTransactional(); - timestamp = TransactionUtil.getTableTimestamp(connection, transactional, null); + timestamp = TransactionUtil.getTableTimestamp(connection, transactional); storeNulls = parent.getStoreNulls(); if (tableType == PTableType.INDEX) { // Index on view @@ -1376,12 +1376,15 @@ public class MetaDataClient { if (transactional) { // FIXME: remove once Tephra handles storing multiple versions of a cell value, // and allows ignoring empty key values for an operation if (Boolean.FALSE.equals(storeNullsProp)) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_FALSE_FOR_TRANSACTIONAL) + throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL) .setSchemaName(schemaName).setTableName(tableName) .build().buildException(); } + // Force STORE_NULLS to true when transactional as Tephra cannot deal with column deletes + storeNulls = true; + tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.TRUE); } - timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional, null) : timestamp; + timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional) : timestamp; // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views if (statement.getTableType() == PTableType.VIEW || indexId != null) { @@ -1891,8 +1894,7 @@ public class MetaDataClient { .setSchemaName(schemaName).setTableName(tableName).build().buildException(); default: - connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, - TransactionUtil.getTableTimestamp(connection, transactional, result.getMutationTime())); + connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, result.getMutationTime()); if (result.getTable() != null && tableType != PTableType.VIEW) { connection.setAutoCommit(true); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index a32e922..b78a904 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -831,7 +831,7 @@ public class PTableImpl implements PTable { @Override public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) { - if (indexMaintainersPtr == null) { + if (indexMaintainersPtr == null || indexMaintainersPtr.getLength()==0) { indexMaintainersPtr = new ImmutableBytesWritable(); if (indexes.isEmpty()) { indexMaintainersPtr.set(ByteUtil.EMPTY_BYTE_ARRAY); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 51428ea..62ec1f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -233,9 +233,10 @@ public class IndexUtil { } public static List<Mutation> generateIndexData(final PTable table, PTable index, - List<Mutation> dataMutations, ImmutableBytesWritable ptr, final KeyValueBuilder kvBuilder, PhoenixConnection connection) + List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection) throws SQLException { try { + final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size()); for (final Mutation dataMutation : dataMutations) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index 0998e72..c8c468d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; @@ -66,7 +67,9 @@ public class ServerUtil { } else if (t instanceof IOException) { // If the IOException does not wrap any exception, then bubble it up. Throwable cause = t.getCause(); - if (cause == null || cause instanceof IOException) { + if (cause instanceof RetriesExhaustedWithDetailsException) + return new DoNotRetryIOException(t.getMessage(), cause); + else if (cause == null || cause instanceof IOException) { return (IOException) t; } // Else assume it's been wrapped, so throw as DoNotRetryIOException to prevent client hanging while retrying http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index 893a895..020ac3b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -17,17 +17,8 @@ */ package org.apache.phoenix.util; -import java.io.IOException; import java.sql.SQLException; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionCodec; -import co.cask.tephra.TransactionConflictException; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.TxConstants; -import co.cask.tephra.hbase98.TransactionAwareHTable; - -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; @@ -36,16 +27,21 @@ import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; +import co.cask.tephra.TransactionConflictException; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TxConstants; +import co.cask.tephra.hbase98.TransactionAwareHTable; + public class TransactionUtil { private TransactionUtil() { } public static long convertToNanoseconds(long serverTimeStamp) { - return serverTimeStamp * 1000000; + return serverTimeStamp * TxConstants.MAX_TX_PER_MS; } - public static long convertToMillisecods(Long serverTimeStamp) { - return serverTimeStamp / 1000000; + public static long convertToMilliseconds(long serverTimeStamp) { + return serverTimeStamp / TxConstants.MAX_TX_PER_MS; } public static SQLException getSQLException(TransactionFailureException e) { @@ -67,10 +63,11 @@ public class TransactionUtil { return new TransactionAwareHTable(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); } - public static long getResolvedTimestamp(PhoenixConnection connection, boolean isTransactional, Long defaultResolvedTimestamp) { + // we resolve transactional tables at the txn read pointer + public static long getResolvedTimestamp(PhoenixConnection connection, boolean isTransactional, long defaultResolvedTimestamp) { MutationState mutationState = connection.getMutationState(); Long scn = connection.getSCN(); - return scn != null ? scn : (isTransactional && mutationState.isTransactionStarted()) ? convertToMillisecods(mutationState.getReadPointer()) : defaultResolvedTimestamp; + return scn != null ? scn : (isTransactional && mutationState.isTransactionStarted()) ? convertToMilliseconds(mutationState.getReadPointer()) : defaultResolvedTimestamp; } public static long getResolvedTime(PhoenixConnection connection, MetaDataMutationResult result) { @@ -79,21 +76,30 @@ public class TransactionUtil { return getResolvedTimestamp(connection, isTransactional, result.getMutationTime()); } - public static long getTableTimestamp(PhoenixConnection connection, MetaDataMutationResult result) { + public static long getResolvedTimestamp(PhoenixConnection connection, MetaDataMutationResult result) { PTable table = result.getTable(); MutationState mutationState = connection.getMutationState(); boolean txInProgress = table != null && table.isTransactional() && mutationState.isTransactionStarted(); - return txInProgress ? convertToMillisecods(mutationState.getReadPointer()) : result.getMutationTime(); + return txInProgress ? convertToMilliseconds(mutationState.getReadPointer()) : result.getMutationTime(); } - public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional, Long mutationTime) throws SQLException { - Long timestamp = mutationTime; + public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional) throws SQLException { + Long timestamp = null; + if (!transactional) { + return timestamp; + } MutationState mutationState = connection.getMutationState(); - if (transactional && !mutationState.isTransactionStarted() && connection.getSCN()==null) { + // we need to burn a txn so that we are sure the txn read pointer is close to wall clock time + if (!mutationState.isTransactionStarted()) { mutationState.startTransaction(); - timestamp = convertToMillisecods(mutationState.getReadPointer()); connection.commit(); } + else { + connection.commit(); + } + mutationState.startTransaction(); + timestamp = convertToMilliseconds(mutationState.getReadPointer()); + connection.commit(); return timestamp; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index da965c8..9cbde9d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -134,6 +134,7 @@ import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.util.ConfigUtil; +import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -178,7 +179,7 @@ import com.google.inject.util.Providers; * */ public abstract class BaseTest { - private static final String TEST_TABLE_SCHEMA = "(" + + protected static final String TEST_TABLE_SCHEMA = "(" + " varchar_pk VARCHAR NOT NULL, " + " char_pk CHAR(6) NOT NULL, " + " int_pk INTEGER NOT NULL, "+ @@ -468,9 +469,9 @@ public abstract class BaseTest { return conf.get(QueryServices.ZOOKEEPER_PORT_ATTRIB); } - private static String url; + protected static String url; protected static PhoenixTestDriver driver; - private static boolean clusterInitialized = false; + protected static boolean clusterInitialized = false; private static HBaseTestingUtility utility; protected static final Configuration config = HBaseConfiguration.create(); @@ -495,7 +496,7 @@ public abstract class BaseTest { } - private static void setupTxManager() throws SQLException, IOException { + protected static void setupTxManager() throws SQLException, IOException { config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); @@ -1729,7 +1730,81 @@ public abstract class BaseTest { return utility; } - protected static void createMultiCFTestTable(String tableName) throws SQLException { + // Populate the test table with data. + public static void populateTestTable(String fullTableName) throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String upsert = "UPSERT INTO " + fullTableName + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + stmt.setString(1, "varchar1"); + stmt.setString(2, "char1"); + stmt.setInt(3, 1); + stmt.setLong(4, 1L); + stmt.setBigDecimal(5, new BigDecimal(1.0)); + Date date = DateUtil.parseDate("2015-01-01 00:00:00"); + stmt.setDate(6, date); + stmt.setString(7, "varchar_a"); + stmt.setString(8, "chara"); + stmt.setInt(9, 2); + stmt.setLong(10, 2L); + stmt.setBigDecimal(11, new BigDecimal(2.0)); + stmt.setDate(12, date); + stmt.setString(13, "varchar_b"); + stmt.setString(14, "charb"); + stmt.setInt(15, 3); + stmt.setLong(16, 3L); + stmt.setBigDecimal(17, new BigDecimal(3.0)); + stmt.setDate(18, date); + stmt.executeUpdate(); + + stmt.setString(1, "varchar2"); + stmt.setString(2, "char2"); + stmt.setInt(3, 2); + stmt.setLong(4, 2L); + stmt.setBigDecimal(5, new BigDecimal(2.0)); + date = DateUtil.parseDate("2015-01-02 00:00:00"); + stmt.setDate(6, date); + stmt.setString(7, "varchar_a"); + stmt.setString(8, "chara"); + stmt.setInt(9, 3); + stmt.setLong(10, 3L); + stmt.setBigDecimal(11, new BigDecimal(3.0)); + stmt.setDate(12, date); + stmt.setString(13, "varchar_b"); + stmt.setString(14, "charb"); + stmt.setInt(15, 4); + stmt.setLong(16, 4L); + stmt.setBigDecimal(17, new BigDecimal(4.0)); + stmt.setDate(18, date); + stmt.executeUpdate(); + + stmt.setString(1, "varchar3"); + stmt.setString(2, "char3"); + stmt.setInt(3, 3); + stmt.setLong(4, 3L); + stmt.setBigDecimal(5, new BigDecimal(3.0)); + date = DateUtil.parseDate("2015-01-03 00:00:00"); + stmt.setDate(6, date); + stmt.setString(7, "varchar_a"); + stmt.setString(8, "chara"); + stmt.setInt(9, 4); + stmt.setLong(10, 4L); + stmt.setBigDecimal(11, new BigDecimal(4.0)); + stmt.setDate(12, date); + stmt.setString(13, "varchar_b"); + stmt.setString(14, "charb"); + stmt.setInt(15, 5); + stmt.setLong(16, 5L); + stmt.setBigDecimal(17, new BigDecimal(5.0)); + stmt.setDate(18, date); + stmt.executeUpdate(); + + conn.commit(); + } + } + + protected static void createMultiCFTestTable(String tableName, String options) throws SQLException { String ddl = "create table if not exists " + tableName + "(" + " varchar_pk VARCHAR NOT NULL, " + " char_pk CHAR(5) NOT NULL, " + @@ -1747,7 +1822,8 @@ public abstract class BaseTest { " b.long_col2 BIGINT, " + " b.decimal_col2 DECIMAL, " + " b.date_col DATE " + - " CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk))"; + " CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk)) " + + (options!=null? options : ""); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); conn.createStatement().execute(ddl); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index afb6df3..1d10116 100644 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ <htrace.version>2.04</htrace.version> <collections.version>3.2.1</collections.version> <jodatime.version>2.3</jodatime.version> - <tephra.version>0.6.1</tephra.version> + <tephra.version>0.6.3-SNAPSHOT</tephra.version> <!-- Test Dependencies --> <mockito-all.version>1.8.5</mockito-all.version>
