IGNITE-7200: SQL: simplified DML module structure and restored encapsulation. This closes #3225.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03bb5513 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03bb5513 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03bb5513 Branch: refs/heads/ignite-zk-ce Commit: 03bb551382be8303a4bcaf0afc3ffa0f9c2885dd Parents: 802a166 Author: devozerov <[email protected]> Authored: Thu Dec 14 15:59:37 2017 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 14 15:59:37 2017 +0300 ---------------------------------------------------------------------- .../query/h2/DmlStatementsProcessor.java | 637 ++---------------- .../processors/query/h2/UpdateResult.java | 4 +- .../processors/query/h2/dml/DmlAstUtils.java | 609 ++++++++++++++++++ .../processors/query/h2/dml/DmlBatchSender.java | 232 +++++++ .../query/h2/dml/DmlDistributedPlanInfo.java | 56 ++ .../h2/dml/DmlPageProcessingErrorResult.java | 76 +++ .../query/h2/dml/DmlPageProcessingResult.java | 68 ++ .../processors/query/h2/dml/DmlUtils.java | 118 ++++ .../processors/query/h2/dml/FastUpdate.java | 175 +++++ .../query/h2/dml/FastUpdateArguments.java | 53 -- .../processors/query/h2/dml/UpdatePlan.java | 389 ++++++++--- .../query/h2/dml/UpdatePlanBuilder.java | 82 ++- .../processors/query/h2/sql/DmlAstUtils.java | 644 ------------------- .../query/h2/sql/GridSqlQueryParser.java | 1 + 14 files changed, 1752 insertions(+), 1392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index c3d48dd..243d1dc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -17,42 +17,29 @@ package org.apache.ignite.internal.processors.query.h2; -import java.lang.reflect.Array; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.sql.Time; -import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.Date; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; -import org.apache.ignite.IgniteCache; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheOperationContext; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; @@ -61,21 +48,19 @@ import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; -import org.apache.ignite.internal.processors.query.GridQueryProperty; -import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; -import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments; +import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender; +import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo; +import org.apache.ignite.internal.processors.query.h2.dml.FastUpdate; import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; @@ -85,19 +70,10 @@ import org.h2.command.dml.Delete; import org.h2.command.dml.Insert; import org.h2.command.dml.Merge; import org.h2.command.dml.Update; -import org.h2.table.Column; -import org.h2.util.DateTimeUtils; -import org.h2.util.LocalDateTimeUtils; -import org.h2.value.Value; -import org.h2.value.ValueDate; -import org.h2.value.ValueTime; -import org.h2.value.ValueTimestamp; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException; import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT; /** * @@ -142,7 +118,7 @@ public class DmlStatementsProcessor { while (iter.hasNext()) { UpdatePlan plan = iter.next().getValue(); - if (F.eq(cacheName, plan.tbl.cacheName())) + if (F.eq(cacheName, plan.cacheContext().name())) iter.remove(); } } @@ -169,7 +145,7 @@ public class DmlStatementsProcessor { UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null); - GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context(); + GridCacheContext<?, ?> cctx = plan.cacheContext(); for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) { CacheOperationContext opCtx = cctx.operationContextPerCall(); @@ -281,20 +257,20 @@ public class DmlStatementsProcessor { UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null); - if (!F.eq(streamer.cacheName(), plan.tbl.rowDescriptor().context().name())) + if (!F.eq(streamer.cacheName(), plan.cacheContext().name())) throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" + " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - if (plan.mode == UpdateMode.INSERT && plan.rowsNum > 0) { - assert plan.isLocSubqry; + if (plan.mode() == UpdateMode.INSERT && plan.rowCount() > 0) { + assert plan.isLocalSubquery(); - final GridCacheContext cctx = plan.tbl.rowDescriptor().context(); + final GridCacheContext cctx = plan.cacheContext(); QueryCursorImpl<List<?>> cur; - final ArrayList<List<?>> data = new ArrayList<>(plan.rowsNum); + final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); - final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQry, + final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQuery(), F.asList(args), null, false, 0, null); QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { @@ -317,18 +293,18 @@ public class DmlStatementsProcessor { } }, null); - if (plan.rowsNum == 1) { - IgniteBiTuple t = rowToKeyValue(cctx, cur.iterator().next(), plan); + if (plan.rowCount() == 1) { + IgniteBiTuple t = plan.processRow(cur.iterator().next()); streamer.addData(t.getKey(), t.getValue()); return 1; } - Map<Object, Object> rows = new LinkedHashMap<>(plan.rowsNum); + Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount()); for (List<?> row : cur) { - final IgniteBiTuple t = rowToKeyValue(cctx, row, plan); + final IgniteBiTuple t = plan.processRow(row); rows.put(t.getKey(), t.getValue()); } @@ -367,13 +343,15 @@ public class DmlStatementsProcessor { UpdatePlan plan = getPlanForStatement(schemaName, c, prepared, fieldsQry, loc, errKeysPos); - if (plan.fastUpdateArgs != null) { + FastUpdate fastUpdate = plan.fastUpdate(); + + if (fastUpdate != null) { assert F.isEmpty(failedKeys) && errKeysPos == null; - return doFastUpdate(plan, fieldsQry.getArgs()); + return fastUpdate.execute(plan.cacheContext().cache(), fieldsQry.getArgs()); } - if (plan.distributed != null) { + if (plan.distributedPlan() != null) { UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel); // null is returned in case not all nodes support distributed DML. @@ -381,14 +359,14 @@ public class DmlStatementsProcessor { return result; } - assert !F.isEmpty(plan.selectQry); + assert !F.isEmpty(plan.selectQuery()); QueryCursorImpl<List<?>> cur; // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual // sub-query and not some dummy stuff like "select 1, 2, 3;" - if (!loc && !plan.isLocSubqry) { - SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated()) + if (!loc && !plan.isLocalSubquery()) { + SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated()) .setArgs(fieldsQry.getArgs()) .setDistributedJoins(fieldsQry.isDistributedJoins()) .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()) @@ -400,7 +378,7 @@ public class DmlStatementsProcessor { cancel, mainCacheId, true).get(0); } else { - final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry, + final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(), F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); cur = new QueryCursorImpl<>(new Iterable<List<?>>() { @@ -430,7 +408,7 @@ public class DmlStatementsProcessor { */ private UpdateResult processDmlSelectResult(GridCacheContext cctx, UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException { - switch (plan.mode) { + switch (plan.mode()) { case MERGE: return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY); @@ -444,7 +422,7 @@ public class DmlStatementsProcessor { return doDelete(cctx, cursor, pageSize); default: - throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode + ']', + throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']', IgniteQueryErrorCode.UNEXPECTED_OPERATION); } } @@ -480,46 +458,6 @@ public class DmlStatementsProcessor { } /** - * Perform single cache operation based on given args. - * @param plan Update plan. - * @param args Query parameters. - * @return 1 if an item was affected, 0 otherwise. - * @throws IgniteCheckedException if failed. - */ - @SuppressWarnings({"unchecked", "ConstantConditions"}) - private static UpdateResult doFastUpdate(UpdatePlan plan, Object[] args) throws IgniteCheckedException { - GridCacheContext cctx = plan.tbl.rowDescriptor().context(); - - FastUpdateArguments singleUpdate = plan.fastUpdateArgs; - - assert singleUpdate != null; - - boolean valBounded = (singleUpdate.val != FastUpdateArguments.NULL_ARGUMENT); - - if (singleUpdate.newVal != FastUpdateArguments.NULL_ARGUMENT) { // Single item UPDATE - Object key = singleUpdate.key.apply(args); - Object newVal = singleUpdate.newVal.apply(args); - - if (valBounded) { - Object val = singleUpdate.val.apply(args); - - return (cctx.cache().replace(key, val, newVal) ? UpdateResult.ONE : UpdateResult.ZERO); - } - else - return (cctx.cache().replace(key, newVal) ? UpdateResult.ONE : UpdateResult.ZERO); - } - else { // Single item DELETE - Object key = singleUpdate.key.apply(args); - Object val = singleUpdate.val.apply(args); - - if (singleUpdate.val == FastUpdateArguments.NULL_ARGUMENT) // No _val bound in source query - return cctx.cache().remove(key) ? UpdateResult.ONE : UpdateResult.ZERO; - else - return cctx.cache().remove(key, val) ? UpdateResult.ONE : UpdateResult.ZERO; - } - } - - /** * @param schemaName Schema name. * @param fieldsQry Initial query. * @param plan Update plan. @@ -529,13 +467,15 @@ public class DmlStatementsProcessor { */ private UpdateResult doDistributedUpdate(String schemaName, SqlFieldsQuery fieldsQry, UpdatePlan plan, GridQueryCancel cancel) throws IgniteCheckedException { - assert plan.distributed != null; + DmlDistributedPlanInfo distributedPlan = plan.distributedPlan(); + + assert distributedPlan != null; if (cancel == null) cancel = new GridQueryCancel(); - return idx.runDistributedUpdate(schemaName, fieldsQry, plan.distributed.getCacheIds(), - plan.distributed.isReplicatedOnly(), cancel); + return idx.runDistributedUpdate(schemaName, fieldsQry, distributedPlan.getCacheIds(), + distributedPlan.isReplicatedOnly(), cancel); } /** @@ -548,7 +488,7 @@ public class DmlStatementsProcessor { @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException { - BatchSender sender = new BatchSender(cctx, pageSize); + DmlBatchSender sender = new DmlBatchSender(cctx, pageSize); for (List<?> row : cursor) { if (row.size() != 2) { @@ -594,84 +534,18 @@ public class DmlStatementsProcessor { @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) private UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException { - GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); - - GridCacheContext cctx = desc.context(); - - boolean bin = cctx.binaryMarshaller(); - - String[] updatedColNames = plan.colNames; - - int valColIdx = plan.valColIdx; + GridCacheContext cctx = plan.cacheContext(); - boolean hasNewVal = (valColIdx != -1); - - // Statement updates distinct properties if it does not have _val in updated columns list - // or if its list of updated columns includes only _val, i.e. is single element. - boolean hasProps = !hasNewVal || updatedColNames.length > 1; - - BatchSender sender = new BatchSender(cctx, pageSize); + DmlBatchSender sender = new DmlBatchSender(cctx, pageSize); for (List<?> row : cursor) { - Object key = row.get(0); - - Object newVal; - - Map<String, Object> newColVals = new HashMap<>(); - - for (int i = 0; i < plan.colNames.length; i++) { - if (hasNewVal && i == valColIdx - 2) - continue; - - GridQueryProperty prop = plan.tbl.rowDescriptor().type().property(plan.colNames[i]); - - assert prop != null : "Unknown property: " + plan.colNames[i]; - - newColVals.put(plan.colNames[i], convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i])); - } - - newVal = plan.valSupplier.apply(row); - - if (newVal == null) - throw new IgniteSQLException("New value for UPDATE must not be null", IgniteQueryErrorCode.NULL_VALUE); - - // Skip key and value - that's why we start off with 3rd column - for (int i = 0; i < plan.tbl.getColumns().length - DEFAULT_COLUMNS_COUNT; i++) { - Column c = plan.tbl.getColumn(i + DEFAULT_COLUMNS_COUNT); - - if (desc.isKeyValueOrVersionColumn(c.getColumnId())) - continue; - - GridQueryProperty prop = desc.type().property(c.getName()); - - if (prop.key()) - continue; // Don't get values of key's columns - we won't use them anyway - - boolean hasNewColVal = newColVals.containsKey(c.getName()); - - if (!hasNewColVal) - continue; - - Object colVal = newColVals.get(c.getName()); - - // UPDATE currently does not allow to modify key or its fields, so we must be safe to pass null as key. - desc.setColumnValue(null, newVal, colVal, i); - } - - if (bin && hasProps) { - assert newVal instanceof BinaryObjectBuilder; - - newVal = ((BinaryObjectBuilder) newVal).build(); - } - - desc.type().validateKeyAndValue(key, newVal); - - Object srcVal = row.get(1); + T3<Object, Object, Object> row0 = plan.processRowForUpdate(row); - if (bin && !(srcVal instanceof BinaryObject)) - srcVal = cctx.grid().binary().toBinary(srcVal); + Object key = row0.get1(); + Object oldVal = row0.get2(); + Object newVal = row0.get3(); - sender.add(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal))); + sender.add(key, new ModifyingEntryProcessor(oldVal, new EntryValueUpdater(newVal))); } sender.flush(); @@ -699,120 +573,6 @@ public class DmlStatementsProcessor { } /** - * Convert value to column's expected type by means of H2. - * - * @param val Source value. - * @param desc Row descriptor. - * @param expCls Expected value class. - * @param type Expected column type to convert to. - * @return Converted object. - * @throws IgniteCheckedException if failed. - */ - @SuppressWarnings({"ConstantConditions", "SuspiciousSystemArraycopy"}) - private static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type) - throws IgniteCheckedException { - if (val == null) - return null; - - Class<?> currCls = val.getClass(); - - try { - if (val instanceof Date && currCls != Date.class && expCls == Date.class) { - // H2 thinks that java.util.Date is always a Timestamp, while binary marshaller expects - // precise Date instance. Let's satisfy it. - return new Date(((Date) val).getTime()); - } - - // User-given UUID is always serialized by H2 to byte array, so we have to deserialize manually - if (type == Value.UUID && currCls == byte[].class) - return U.unmarshal(desc.context().marshaller(), (byte[]) val, - U.resolveClassLoader(desc.context().gridConfig())); - - if (LocalDateTimeUtils.isJava8DateApiPresent()) { - if (val instanceof Timestamp && LocalDateTimeUtils.isLocalDateTime(expCls)) - return LocalDateTimeUtils.valueToLocalDateTime(ValueTimestamp.get((Timestamp) val)); - - if (val instanceof Date && LocalDateTimeUtils.isLocalDate(expCls)) - return LocalDateTimeUtils.valueToLocalDate(ValueDate.fromDateValue( - DateTimeUtils.dateValueFromDate(((Date) val).getTime()))); - - if (val instanceof Time && LocalDateTimeUtils.isLocalTime(expCls)) - return LocalDateTimeUtils.valueToLocalTime(ValueTime.get((Time) val)); - } - - // We have to convert arrays of reference types manually - - // see https://issues.apache.org/jira/browse/IGNITE-4327 - // Still, we only can convert from Object[] to something more precise. - if (type == Value.ARRAY && currCls != expCls) { - if (currCls != Object[].class) - throw new IgniteCheckedException("Unexpected array type - only conversion from Object[] " + - "is assumed"); - - // Why would otherwise type be Value.ARRAY? - assert expCls.isArray(); - - Object[] curr = (Object[]) val; - - Object newArr = Array.newInstance(expCls.getComponentType(), curr.length); - - System.arraycopy(curr, 0, newArr, 0, curr.length); - - return newArr; - } - - return H2Utils.convert(val, desc, type); - } - catch (Exception e) { - throw new IgniteSQLException("Value conversion failed [from=" + currCls.getName() + ", to=" + - expCls.getName() +']', IgniteQueryErrorCode.CONVERSION_FAILED, e); - } - } - - /** - * Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose - * processing yielded an exception. - * - * @param res Result of {@link GridCacheAdapter#invokeAll)} - * @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is - * null if all keys are duplicates/concurrently modified ones). - */ - private static PageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) { - Set<Object> errKeys = new LinkedHashSet<>(res.keySet()); - - SQLException currSqlEx = null; - - SQLException firstSqlEx = null; - - int errors = 0; - - // Let's form a chain of SQL exceptions - for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) { - try { - e.getValue().get(); - } - catch (EntryProcessorException ex) { - SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'', - IgniteQueryErrorCode.ENTRY_PROCESSING); - - next.initCause(ex); - - if (currSqlEx != null) - currSqlEx.setNextException(next); - else - firstSqlEx = next; - - currSqlEx = next; - - errKeys.remove(e.getKey()); - - errors++; - } - } - - return new PageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors); - } - - /** * Execute MERGE statement plan. * @param cursor Cursor to take inserted data from. * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations. @@ -821,13 +581,11 @@ public class DmlStatementsProcessor { */ @SuppressWarnings("unchecked") private long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException { - GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); - - GridCacheContext cctx = desc.context(); + GridCacheContext cctx = plan.cacheContext(); // If we have just one item to put, just do so - if (plan.rowsNum == 1) { - IgniteBiTuple t = rowToKeyValue(cctx, cursor.iterator().next(), plan); + if (plan.rowCount() == 1) { + IgniteBiTuple t = plan.processRow(cursor.iterator().next()); cctx.cache().put(t.getKey(), t.getValue()); @@ -841,7 +599,7 @@ public class DmlStatementsProcessor { for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) { List<?> row = it.next(); - IgniteBiTuple t = rowToKeyValue(cctx, row, plan); + IgniteBiTuple t = plan.processRow(row); rows.put(t.getKey(), t.getValue()); @@ -868,13 +626,11 @@ public class DmlStatementsProcessor { */ @SuppressWarnings({"unchecked", "ConstantConditions"}) private long doInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException { - GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); - - GridCacheContext cctx = desc.context(); + GridCacheContext cctx = plan.cacheContext(); // If we have just one item to put, just do so - if (plan.rowsNum == 1) { - IgniteBiTuple t = rowToKeyValue(cctx, cursor.iterator().next(), plan); + if (plan.rowCount() == 1) { + IgniteBiTuple t = plan.processRow(cursor.iterator().next()); if (cctx.cache().putIfAbsent(t.getKey(), t.getValue())) return 1; @@ -884,10 +640,10 @@ public class DmlStatementsProcessor { } else { // Keys that failed to INSERT due to duplication. - BatchSender sender = new BatchSender(cctx, pageSize); + DmlBatchSender sender = new DmlBatchSender(cctx, pageSize); for (List<?> row : cursor) { - final IgniteBiTuple keyValPair = rowToKeyValue(cctx, row, plan); + final IgniteBiTuple keyValPair = plan.processRow(row); sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue())); } @@ -916,124 +672,6 @@ public class DmlStatementsProcessor { } /** - * Execute given entry processors and collect errors, if any. - * @param cctx Cache context. - * @param rows Rows to process. - * @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently - * updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors]. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings({"unchecked", "ConstantConditions"}) - private static PageProcessingResult processPage(GridCacheContext cctx, - Map<Object, EntryProcessor<Object, Object, Boolean>> rows) throws IgniteCheckedException { - Map<Object, EntryProcessorResult<Boolean>> res = cctx.cache().invokeAll(rows); - - if (F.isEmpty(res)) - return new PageProcessingResult(rows.size(), null, null); - - PageProcessingErrorResult splitRes = splitErrors(res); - - int keysCnt = splitRes.errKeys.length; - - return new PageProcessingResult(rows.size() - keysCnt - splitRes.cnt, splitRes.errKeys, splitRes.ex); - } - - /** - * Convert row presented as an array of Objects into key-value pair to be inserted to cache. - * @param cctx Cache context. - * @param row Row to process. - * @param plan Update plan. - * @throws IgniteCheckedException if failed. - */ - @SuppressWarnings({"unchecked", "ConstantConditions", "ResultOfMethodCallIgnored"}) - private IgniteBiTuple<?, ?> rowToKeyValue(GridCacheContext cctx, List<?> row, UpdatePlan plan) - throws IgniteCheckedException { - GridH2RowDescriptor rowDesc = plan.tbl.rowDescriptor(); - GridQueryTypeDescriptor desc = rowDesc.type(); - - Object key = plan.keySupplier.apply(row); - - if (QueryUtils.isSqlType(desc.keyClass())) { - assert plan.keyColIdx != -1; - - key = convert(key, rowDesc, desc.keyClass(), plan.colTypes[plan.keyColIdx]); - } - - Object val = plan.valSupplier.apply(row); - - if (QueryUtils.isSqlType(desc.valueClass())) { - assert plan.valColIdx != -1; - - val = convert(val, rowDesc, desc.valueClass(), plan.colTypes[plan.valColIdx]); - } - - if (key == null) { - if (F.isEmpty(desc.keyFieldName())) - throw new IgniteSQLException("Key for INSERT or MERGE must not be null", IgniteQueryErrorCode.NULL_KEY); - else - throw new IgniteSQLException("Null value is not allowed for column '" + desc.keyFieldName() + "'", - IgniteQueryErrorCode.NULL_KEY); - } - - if (val == null) { - if (F.isEmpty(desc.valueFieldName())) - throw new IgniteSQLException("Value for INSERT, MERGE, or UPDATE must not be null", - IgniteQueryErrorCode.NULL_VALUE); - else - throw new IgniteSQLException("Null value is not allowed for column '" + desc.valueFieldName() + "'", - IgniteQueryErrorCode.NULL_VALUE); - } - - Map<String, Object> newColVals = new HashMap<>(); - - for (int i = 0; i < plan.colNames.length; i++) { - if (i == plan.keyColIdx || i == plan.valColIdx) - continue; - - String colName = plan.colNames[i]; - - GridQueryProperty prop = desc.property(colName); - - assert prop != null; - - Class<?> expCls = prop.type(); - - newColVals.put(colName, convert(row.get(i), rowDesc, expCls, plan.colTypes[i])); - } - - // We update columns in the order specified by the table for a reason - table's - // column order preserves their precedence for correct update of nested properties. - Column[] cols = plan.tbl.getColumns(); - - // First 3 columns are _key, _val and _ver. Skip 'em. - for (int i = DEFAULT_COLUMNS_COUNT; i < cols.length; i++) { - if (plan.tbl.rowDescriptor().isKeyValueOrVersionColumn(i)) - continue; - - String colName = cols[i].getName(); - - if (!newColVals.containsKey(colName)) - continue; - - Object colVal = newColVals.get(colName); - - desc.setValue(colName, key, val, colVal); - } - - if (cctx.binaryMarshaller()) { - if (key instanceof BinaryObjectBuilder) - key = ((BinaryObjectBuilder) key).build(); - - if (val instanceof BinaryObjectBuilder) - val = ((BinaryObjectBuilder) val).build(); - } - - desc.validateKeyAndValue(key, val); - - return new IgniteBiTuple<>(key, val); - } - - /** * * @param schemaName Schema name. * @param stmt Prepared statement. @@ -1164,7 +802,7 @@ public class DmlStatementsProcessor { static void checkUpdateResult(UpdateResult r) { if (!F.isEmpty(r.errorKeys())) { String msg = "Failed to update some keys because they had been modified concurrently " + - "[keys=" + r.errorKeys() + ']'; + "[keys=" + Arrays.toString(r.errorKeys()) + ']'; SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE); @@ -1172,175 +810,4 @@ public class DmlStatementsProcessor { } } - /** Result of processing an individual page with {@link IgniteCache#invokeAll} including error details, if any. */ - private final static class PageProcessingResult { - /** Number of successfully processed items. */ - final long cnt; - - /** Keys that failed to be updated or deleted due to concurrent modification of values. */ - @NotNull - final Object[] errKeys; - - /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */ - final SQLException ex; - - /** */ - @SuppressWarnings("ConstantConditions") - private PageProcessingResult(long cnt, Object[] errKeys, SQLException ex) { - this.cnt = cnt; - this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY); - this.ex = ex; - } - } - - /** Result of splitting keys whose processing resulted into an exception from those skipped by - * logic of {@link EntryProcessor}s (most likely INSERT duplicates, or UPDATE/DELETE keys whose values - * had been modified concurrently), counting and collecting entry processor exceptions. - */ - private final static class PageProcessingErrorResult { - /** Keys that failed to be processed by {@link EntryProcessor} (not due to an exception). */ - @NotNull - final Object[] errKeys; - - /** Number of entries whose processing resulted into an exception. */ - final int cnt; - - /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */ - final SQLException ex; - - /** */ - @SuppressWarnings("ConstantConditions") - private PageProcessingErrorResult(@NotNull Object[] errKeys, SQLException ex, int exCnt) { - errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY); - // When exceptions count must be zero, exceptions chain must be not null, and vice versa. - assert exCnt == 0 ^ ex != null; - - this.errKeys = errKeys; - this.cnt = exCnt; - this.ex = ex; - } - } - - /** - * Batch sender class. - */ - private static class BatchSender { - /** Cache context. */ - private final GridCacheContext cctx; - - /** Batch size. */ - private final int size; - - /** Batches. */ - private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<>(); - - /** Result count. */ - private long updateCnt; - - /** Failed keys. */ - private List<Object> failedKeys; - - /** Exception. */ - private SQLException err; - - /** - * Constructor. - * - * @param cctx Cache context. - * @param size Batch. - */ - public BatchSender(GridCacheContext cctx, int size) { - this.cctx = cctx; - this.size = size; - } - - /** - * Add entry to batch. - * - * @param key Key. - * @param proc Processor. - */ - public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException { - ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE); - - if (node == null) - throw new IgniteCheckedException("Failed to map key to node."); - - UUID nodeId = node.id(); - - Map<Object, EntryProcessor<Object, Object, Boolean>> batch = batches.get(nodeId); - - if (batch == null) { - batch = new HashMap<>(); - - batches.put(nodeId, batch); - } - - batch.put(key, proc); - - if (batch.size() >= size) { - sendBatch(batch); - - batch.clear(); - } - } - - /** - * Flush any remaining entries. - * - * @throws IgniteCheckedException If failed. - */ - public void flush() throws IgniteCheckedException { - for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : batches.values()) { - if (!batch.isEmpty()) - sendBatch(batch); - } - } - - /** - * @return Update count. - */ - public long updateCount() { - return updateCnt; - } - - /** - * @return Failed keys. - */ - public List<Object> failedKeys() { - return failedKeys != null ? failedKeys : Collections.emptyList(); - } - - /** - * @return Error. - */ - public SQLException error() { - return err; - } - - /** - * Send the batch. - * - * @param batch Batch. - * @throws IgniteCheckedException If failed. - */ - private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch) - throws IgniteCheckedException { - PageProcessingResult pageRes = processPage(cctx, batch); - - updateCnt += pageRes.cnt; - - if (failedKeys == null) - failedKeys = new ArrayList<>(); - - failedKeys.addAll(F.asList(pageRes.errKeys)); - - if (pageRes.ex != null) { - if (err == null) - err = pageRes.ex; - else - err.setNextException(pageRes.ex); - } - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java index de0e63f..32381ba 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java @@ -25,10 +25,10 @@ import org.apache.ignite.internal.util.typedef.internal.U; */ public final class UpdateResult { /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */ - final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY); + public static final UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY); /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */ - final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY); + public static final UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY); /** Number of processed items. */ private final long cnt; http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java new file mode 100644 index 0000000..054e708 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.dml; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlArray; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDelete; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlElement; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunction; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlJoin; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlKeyword; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperation; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlParameter; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSubquery; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate; +import org.apache.ignite.internal.util.lang.IgnitePair; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.h2.command.Parser; +import org.h2.expression.Expression; +import org.h2.table.Column; +import org.h2.table.Table; +import org.h2.util.IntArray; +import org.h2.value.DataType; +import org.h2.value.Value; +import org.h2.value.ValueDate; +import org.h2.value.ValueInt; +import org.h2.value.ValueString; +import org.h2.value.ValueTime; +import org.h2.value.ValueTimestamp; +import org.jetbrains.annotations.Nullable; + +/** + * AST utils for DML + */ +public final class DmlAstUtils { + /** + * Empty ctor to prevent initialization. + */ + private DmlAstUtils() { + // No-op. + } + + /** + * Create SELECT on which subsequent INSERT or MERGE will be based. + * + * @param cols Columns to insert values into. + * @param rows Rows to create pseudo-SELECT upon. + * @param subQry Subquery to use rather than rows. + * @return Subquery or pseudo-SELECT to evaluate inserted expressions. + */ + public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, List<GridSqlElement[]> rows, + GridSqlQuery subQry) { + if (!F.isEmpty(rows)) { + assert !F.isEmpty(cols); + + GridSqlSelect sel = new GridSqlSelect(); + + GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE); + + sel.from(from); + + GridSqlArray[] args = new GridSqlArray[cols.length]; + + for (int i = 0; i < cols.length; i++) { + GridSqlArray arr = new GridSqlArray(rows.size()); + + String colName = cols[i].columnName(); + + GridSqlAlias alias = new GridSqlAlias(colName, arr); + + alias.resultType(cols[i].resultType()); + + from.addChild(alias); + + args[i] = arr; + + GridSqlColumn newCol = new GridSqlColumn(null, from, null,"TABLE", colName); + + newCol.resultType(cols[i].resultType()); + + sel.addColumn(newCol, true); + } + + for (GridSqlElement[] row : rows) { + assert cols.length == row.length; + + for (int i = 0; i < row.length; i++) + args[i].addChild(row[i]); + } + + return sel; + } + else { + assert subQry != null; + + return subQry; + } + } + + /** + * Generate SQL SELECT based on DELETE's WHERE, LIMIT, etc. + * + * @param del Delete statement. + * @param keysParamIdx Index for . + * @return SELECT statement. + */ + public static GridSqlSelect selectForDelete(GridSqlDelete del, @Nullable Integer keysParamIdx) { + GridSqlSelect mapQry = new GridSqlSelect(); + + mapQry.from(del.from()); + + Set<GridSqlTable> tbls = new HashSet<>(); + + collectAllGridTablesInTarget(del.from(), tbls); + + assert tbls.size() == 1 : "Failed to determine target table for DELETE"; + + GridSqlTable tbl = tbls.iterator().next(); + + GridH2Table gridTbl = tbl.dataTable(); + + assert gridTbl != null : "Failed to determine target grid table for DELETE"; + + Column h2KeyCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.KEY_COL); + + Column h2ValCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.VAL_COL); + + GridSqlColumn keyCol = new GridSqlColumn(h2KeyCol, tbl, h2KeyCol.getName()); + keyCol.resultType(GridSqlType.fromColumn(h2KeyCol)); + + GridSqlColumn valCol = new GridSqlColumn(h2ValCol, tbl, h2ValCol.getName()); + valCol.resultType(GridSqlType.fromColumn(h2ValCol)); + + mapQry.addColumn(keyCol, true); + mapQry.addColumn(valCol, true); + + GridSqlElement where = del.where(); + if (keysParamIdx != null) + where = injectKeysFilterParam(where, keyCol, keysParamIdx); + + mapQry.where(where); + mapQry.limit(del.limit()); + + return mapQry; + } + + /** + * @param update UPDATE statement. + * @return {@code null} if given statement directly updates {@code _val} column with a literal or param value + * and filters by single non expression key (and, optionally, by single non expression value). + */ + public static FastUpdate getFastUpdateArgs(GridSqlUpdate update) { + IgnitePair<GridSqlElement> filter = findKeyValueEqualityCondition(update.where()); + + if (filter == null) + return null; + + if (update.cols().size() != 1) + return null; + + Table tbl = update.cols().get(0).column().getTable(); + if (!(tbl instanceof GridH2Table)) + return null; + + GridH2RowDescriptor desc = ((GridH2Table)tbl).rowDescriptor(); + if (!desc.isValueColumn(update.cols().get(0).column().getColumnId())) + return null; + + GridSqlElement set = update.set().get(update.cols().get(0).columnName()); + + if (!(set instanceof GridSqlConst || set instanceof GridSqlParameter)) + return null; + + return FastUpdate.create(filter.getKey(), filter.getValue(), set); + } + + /** + * @param del DELETE statement. + * @return {@code true} if given statement filters by single non expression key. + */ + public static FastUpdate getFastDeleteArgs(GridSqlDelete del) { + IgnitePair<GridSqlElement> filter = findKeyValueEqualityCondition(del.where()); + + if (filter == null) + return null; + + return FastUpdate.create(filter.getKey(), filter.getValue(), null); + } + + /** + * @param where Element to test. + * @return Whether given element corresponds to {@code WHERE _key = ?}, and key is a literal expressed + * in query or a query param. + */ + @SuppressWarnings("RedundantCast") + private static IgnitePair<GridSqlElement> findKeyValueEqualityCondition(GridSqlElement where) { + if (where == null || !(where instanceof GridSqlOperation)) + return null; + + GridSqlOperation whereOp = (GridSqlOperation) where; + + // Does this WHERE limit only by _key? + if (isKeyEqualityCondition(whereOp)) + return new IgnitePair<>((GridSqlElement)whereOp.child(1), null); + + // Or maybe it limits both by _key and _val? + if (whereOp.operationType() != GridSqlOperationType.AND) + return null; + + GridSqlElement left = whereOp.child(0); + + GridSqlElement right = whereOp.child(1); + + if (!(left instanceof GridSqlOperation && right instanceof GridSqlOperation)) + return null; + + GridSqlOperation leftOp = (GridSqlOperation) left; + + GridSqlOperation rightOp = (GridSqlOperation) right; + + if (isKeyEqualityCondition(leftOp)) { // _key = ? and _val = ? + if (!isValueEqualityCondition(rightOp)) + return null; + + return new IgnitePair<>((GridSqlElement)leftOp.child(1), (GridSqlElement)rightOp.child(1)); + } + else if (isKeyEqualityCondition(rightOp)) { // _val = ? and _key = ? + if (!isValueEqualityCondition(leftOp)) + return null; + + return new IgnitePair<>((GridSqlElement)rightOp.child(1), (GridSqlElement)leftOp.child(1)); + } + else // Neither + return null; + } + + /** + * @param op Operation. + * @param key true - check for key equality condition, + * otherwise check for value equality condition + * @return Whether this condition is of form {@code colName} = ? + */ + private static boolean isEqualityCondition(GridSqlOperation op, boolean key) { + if (op.operationType() != GridSqlOperationType.EQUAL) + return false; + + GridSqlElement left = op.child(0); + GridSqlElement right = op.child(1); + + if (!(left instanceof GridSqlColumn)) + return false; + + GridSqlColumn column = (GridSqlColumn)left; + if (!(column.column().getTable() instanceof GridH2Table)) + return false; + + GridH2RowDescriptor desc =((GridH2Table) column.column().getTable()).rowDescriptor(); + + return (key ? desc.isKeyColumn(column.column().getColumnId()) : + desc.isValueColumn(column.column().getColumnId())) && + (right instanceof GridSqlConst || right instanceof GridSqlParameter); + } + + /** + * @param op Operation. + * @return Whether this condition is of form _key = ? + */ + private static boolean isKeyEqualityCondition(GridSqlOperation op) { + return isEqualityCondition(op, true); + } + + /** + * @param op Operation. + * @return Whether this condition is of form _val = ? + */ + private static boolean isValueEqualityCondition(GridSqlOperation op) { + return isEqualityCondition(op, false); + } + + + /** + * Generate SQL SELECT based on UPDATE's WHERE, LIMIT, etc. + * + * @param update Update statement. + * @param keysParamIdx Index of new param for the array of keys. + * @return SELECT statement. + */ + public static GridSqlSelect selectForUpdate(GridSqlUpdate update, @Nullable Integer keysParamIdx) { + GridSqlSelect mapQry = new GridSqlSelect(); + + mapQry.from(update.target()); + + Set<GridSqlTable> tbls = new HashSet<>(); + + collectAllGridTablesInTarget(update.target(), tbls); + + assert tbls.size() == 1 : "Failed to determine target table for UPDATE"; + + GridSqlTable tbl = tbls.iterator().next(); + + GridH2Table gridTbl = tbl.dataTable(); + + assert gridTbl != null : "Failed to determine target grid table for UPDATE"; + + Column h2KeyCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.KEY_COL); + + Column h2ValCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.VAL_COL); + + GridSqlColumn keyCol = new GridSqlColumn(h2KeyCol, tbl, h2KeyCol.getName()); + keyCol.resultType(GridSqlType.fromColumn(h2KeyCol)); + + GridSqlColumn valCol = new GridSqlColumn(h2ValCol, tbl, h2ValCol.getName()); + valCol.resultType(GridSqlType.fromColumn(h2ValCol)); + + mapQry.addColumn(keyCol, true); + mapQry.addColumn(valCol, true); + + for (GridSqlColumn c : update.cols()) { + String newColName = Parser.quoteIdentifier("_upd_" + c.columnName()); + // We have to use aliases to cover cases when the user + // wants to update _val field directly (if it's a literal) + GridSqlAlias alias = new GridSqlAlias(newColName, elementOrDefault(update.set().get(c.columnName()), c), true); + alias.resultType(c.resultType()); + mapQry.addColumn(alias, true); + } + + GridSqlElement where = update.where(); + if (keysParamIdx != null) + where = injectKeysFilterParam(where, keyCol, keysParamIdx); + + mapQry.where(where); + mapQry.limit(update.limit()); + + return mapQry; + } + + /** + * Do what we can to compute default value for this column (mimics H2 behavior). + * @see Table#getDefaultValue + * @see Column#validateConvertUpdateSequence + * @param el SQL element. + * @param col Column. + * @return {@link GridSqlConst#NULL}, if {@code el} is null, or {@code el} if + * it's not {@link GridSqlKeyword#DEFAULT}, or computed default value. + */ + private static GridSqlElement elementOrDefault(GridSqlElement el, GridSqlColumn col) { + if (el == null) + return GridSqlConst.NULL; + + if (el != GridSqlKeyword.DEFAULT) + return el; + + Column h2Col = col.column(); + + Expression dfltExpr = h2Col.getDefaultExpression(); + + Value dfltVal; + + try { + dfltVal = dfltExpr != null ? dfltExpr.getValue(null) : null; + } + catch (Exception ignored) { + throw new IgniteSQLException("Failed to evaluate default value for a column " + col.columnName()); + } + + if (dfltVal != null) + return new GridSqlConst(dfltVal); + + int type = h2Col.getType(); + + DataType dt = DataType.getDataType(type); + + if (dt.decimal) + dfltVal = ValueInt.get(0).convertTo(type); + else if (dt.type == Value.TIMESTAMP) + dfltVal = ValueTimestamp.fromMillis(U.currentTimeMillis()); + else if (dt.type == Value.TIME) + dfltVal = ValueTime.fromNanos(0); + else if (dt.type == Value.DATE) + dfltVal = ValueDate.fromMillis(U.currentTimeMillis()); + else + dfltVal = ValueString.get("").convertTo(type); + + return new GridSqlConst(dfltVal); + } + + /** + * Append additional condition to WHERE for it to select only specific keys. + * + * @param where Initial condition. + * @param keyCol Column to base the new condition on. + * @return New condition. + */ + private static GridSqlElement injectKeysFilterParam(GridSqlElement where, GridSqlColumn keyCol, int paramIdx) { + // Yes, we need a subquery for "WHERE _key IN ?" to work with param being an array without dirty query rewriting. + GridSqlSelect sel = new GridSqlSelect(); + + GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE); + + sel.from(from); + + GridSqlColumn col = new GridSqlColumn(null, from, null, "TABLE", "_IGNITE_ERR_KEYS"); + + sel.addColumn(col, true); + + GridSqlAlias alias = new GridSqlAlias("_IGNITE_ERR_KEYS", new GridSqlParameter(paramIdx)); + + alias.resultType(keyCol.resultType()); + + from.addChild(alias); + + GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlSubquery(sel)); + + if (where == null) + return e; + else + return new GridSqlOperation(GridSqlOperationType.AND, where, e); + } + + /** + * @param qry Select. + * @param params Parameters. + * @param target Extracted parameters. + * @param paramIdxs Parameter indexes. + * @return Extracted parameters list. + */ + @SuppressWarnings("unused") + private static List<Object> findParams(GridSqlQuery qry, Object[] params, ArrayList<Object> target, + IntArray paramIdxs) { + if (qry instanceof GridSqlSelect) + return findParams((GridSqlSelect)qry, params, target, paramIdxs); + + GridSqlUnion union = (GridSqlUnion)qry; + + findParams(union.left(), params, target, paramIdxs); + findParams(union.right(), params, target, paramIdxs); + + findParams((GridSqlElement)qry.limit(), params, target, paramIdxs); + findParams((GridSqlElement)qry.offset(), params, target, paramIdxs); + + return target; + } + + /** + * @param qry Select. + * @param params Parameters. + * @param target Extracted parameters. + * @param paramIdxs Parameter indexes. + * @return Extracted parameters list. + */ + private static List<Object> findParams(GridSqlSelect qry, Object[] params, ArrayList<Object> target, + IntArray paramIdxs) { + if (params.length == 0) + return target; + + for (GridSqlAst el : qry.columns(false)) + findParams((GridSqlElement)el, params, target, paramIdxs); + + findParams((GridSqlElement)qry.from(), params, target, paramIdxs); + findParams((GridSqlElement)qry.where(), params, target, paramIdxs); + + // Don't search in GROUP BY and HAVING since they expected to be in select list. + + findParams((GridSqlElement)qry.limit(), params, target, paramIdxs); + findParams((GridSqlElement)qry.offset(), params, target, paramIdxs); + + return target; + } + + /** + * @param el Element. + * @param params Parameters. + * @param target Extracted parameters. + * @param paramIdxs Parameter indexes. + */ + private static void findParams(@Nullable GridSqlElement el, Object[] params, ArrayList<Object> target, + IntArray paramIdxs) { + if (el == null) + return; + + if (el instanceof GridSqlParameter) { + // H2 Supports queries like "select ?5" but first 4 non-existing parameters are need to be set to any value. + // Here we will set them to NULL. + final int idx = ((GridSqlParameter)el).index(); + + while (target.size() < idx) + target.add(null); + + if (params.length <= idx) + throw new IgniteException("Invalid number of query parameters. " + + "Cannot find " + idx + " parameter."); + + Object param = params[idx]; + + if (idx == target.size()) + target.add(param); + else + target.set(idx, param); + + paramIdxs.add(idx); + } + else if (el instanceof GridSqlSubquery) + findParams(((GridSqlSubquery)el).subquery(), params, target, paramIdxs); + else + for (int i = 0; i < el.size(); i++) + findParams((GridSqlElement)el.child(i), params, target, paramIdxs); + } + + /** + * Processes all the tables and subqueries using the given closure. + * + * @param from FROM element. + * @param c Closure each found table and subquery will be passed to. If returns {@code true} the we need to stop. + * @return {@code true} If we have found. + */ + @SuppressWarnings("RedundantCast") + private static boolean findTablesInFrom(GridSqlElement from, IgnitePredicate<GridSqlElement> c) { + if (from == null) + return false; + + if (from instanceof GridSqlTable || from instanceof GridSqlSubquery) + return c.apply(from); + + if (from instanceof GridSqlJoin) { + // Left and right. + if (findTablesInFrom((GridSqlElement)from.child(0), c)) + return true; + + if (findTablesInFrom((GridSqlElement)from.child(1), c)) + return true; + + // We don't process ON condition because it is not a joining part of from here. + return false; + } + else if (from instanceof GridSqlAlias) + return findTablesInFrom((GridSqlElement)from.child(), c); + else if (from instanceof GridSqlFunction) + return false; + + throw new IllegalStateException(from.getClass().getName() + " : " + from.getSQL()); + } + + /** + * @param from From element. + * @param tbls Tables. + */ + public static void collectAllGridTablesInTarget(GridSqlElement from, final Set<GridSqlTable> tbls) { + findTablesInFrom(from, new IgnitePredicate<GridSqlElement>() { + @Override public boolean apply(GridSqlElement el) { + if (el instanceof GridSqlTable) + tbls.add((GridSqlTable)el); + + return false; + } + }); + } + + /** + * @param target Expression to extract the table from. + * @return Back end table for this element. + */ + public static GridSqlTable gridTableForElement(GridSqlElement target) { + Set<GridSqlTable> tbls = new HashSet<>(); + + collectAllGridTablesInTarget(target, tbls); + + if (tbls.size() != 1) + throw new IgniteSQLException("Failed to determine target table", IgniteQueryErrorCode.TABLE_NOT_FOUND); + + return tbls.iterator().next(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java new file mode 100644 index 0000000..a4a60c3 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.dml; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.util.typedef.F; + +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException; + +/** + * Batch sender class. + */ +public class DmlBatchSender { + /** Cache context. */ + private final GridCacheContext cctx; + + /** Batch size. */ + private final int size; + + /** Batches. */ + private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<>(); + + /** Result count. */ + private long updateCnt; + + /** Failed keys. */ + private List<Object> failedKeys; + + /** Exception. */ + private SQLException err; + + /** + * Constructor. + * + * @param cctx Cache context. + * @param size Batch. + */ + public DmlBatchSender(GridCacheContext cctx, int size) { + this.cctx = cctx; + this.size = size; + } + + /** + * Add entry to batch. + * + * @param key Key. + * @param proc Processor. + */ + public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException { + ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE); + + if (node == null) + throw new IgniteCheckedException("Failed to map key to node."); + + UUID nodeId = node.id(); + + Map<Object, EntryProcessor<Object, Object, Boolean>> batch = batches.get(nodeId); + + if (batch == null) { + batch = new HashMap<>(); + + batches.put(nodeId, batch); + } + + batch.put(key, proc); + + if (batch.size() >= size) { + sendBatch(batch); + + batch.clear(); + } + } + + /** + * Flush any remaining entries. + * + * @throws IgniteCheckedException If failed. + */ + public void flush() throws IgniteCheckedException { + for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : batches.values()) { + if (!batch.isEmpty()) + sendBatch(batch); + } + } + + /** + * @return Update count. + */ + public long updateCount() { + return updateCnt; + } + + /** + * @return Failed keys. + */ + public List<Object> failedKeys() { + return failedKeys != null ? failedKeys : Collections.emptyList(); + } + + /** + * @return Error. + */ + public SQLException error() { + return err; + } + + /** + * Send the batch. + * + * @param batch Batch. + * @throws IgniteCheckedException If failed. + */ + private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch) + throws IgniteCheckedException { + DmlPageProcessingResult pageRes = processPage(cctx, batch); + + updateCnt += pageRes.count(); + + if (failedKeys == null) + failedKeys = new ArrayList<>(); + + failedKeys.addAll(F.asList(pageRes.errorKeys())); + + if (pageRes.error() != null) { + if (err == null) + err = error(); + else + err.setNextException(error()); + } + } + + /** + * Execute given entry processors and collect errors, if any. + * @param cctx Cache context. + * @param rows Rows to process. + * @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently + * updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors]. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings({"unchecked", "ConstantConditions"}) + private static DmlPageProcessingResult processPage(GridCacheContext cctx, + Map<Object, EntryProcessor<Object, Object, Boolean>> rows) throws IgniteCheckedException { + Map<Object, EntryProcessorResult<Boolean>> res = cctx.cache().invokeAll(rows); + + if (F.isEmpty(res)) + return new DmlPageProcessingResult(rows.size(), null, null); + + DmlPageProcessingErrorResult splitRes = splitErrors(res); + + int keysCnt = splitRes.errorKeys().length; + + return new DmlPageProcessingResult(rows.size() - keysCnt - splitRes.errorCount(), splitRes.errorKeys(), + splitRes.error()); + } + + /** + * Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose + * processing yielded an exception. + * + * @param res Result of {@link GridCacheAdapter#invokeAll)} + * @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is + * null if all keys are duplicates/concurrently modified ones). + */ + private static DmlPageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) { + Set<Object> errKeys = new LinkedHashSet<>(res.keySet()); + + SQLException currSqlEx = null; + + SQLException firstSqlEx = null; + + int errors = 0; + + // Let's form a chain of SQL exceptions + for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) { + try { + e.getValue().get(); + } + catch (EntryProcessorException ex) { + SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'', + IgniteQueryErrorCode.ENTRY_PROCESSING); + + next.initCause(ex); + + if (currSqlEx != null) + currSqlEx.setNextException(next); + else + firstSqlEx = next; + + currSqlEx = next; + + errKeys.remove(e.getKey()); + + errors++; + } + } + + return new DmlPageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedPlanInfo.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedPlanInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedPlanInfo.java new file mode 100644 index 0000000..44c6e22 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedPlanInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.dml; + +import java.util.List; + +/** + * Additional information about distributed update plan. + */ +public final class DmlDistributedPlanInfo { + /** Whether update involves only replicated caches. */ + private final boolean replicatedOnly; + + /** Identifiers of caches involved in update (used for cluster nodes mapping). */ + private final List<Integer> cacheIds; + + /** + * Constructor. + * + * @param replicatedOnly Whether all caches are replicated. + * @param cacheIds List of cache identifiers. + */ + public DmlDistributedPlanInfo(boolean replicatedOnly, List<Integer> cacheIds) { + this.replicatedOnly = replicatedOnly; + this.cacheIds = cacheIds; + } + + /** + * @return {@code true} in case all involved caches are replicated. + */ + public boolean isReplicatedOnly() { + return replicatedOnly; + } + + /** + * @return cache identifiers. + */ + public List<Integer> getCacheIds() { + return cacheIds; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingErrorResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingErrorResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingErrorResult.java new file mode 100644 index 0000000..02e7359 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingErrorResult.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.dml; + +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import javax.cache.processor.EntryProcessor; +import java.sql.SQLException; + +/** + * Result of splitting keys whose processing resulted into an exception from those skipped by + * logic of {@link EntryProcessor}s (most likely INSERT duplicates, or UPDATE/DELETE keys whose values + * had been modified concurrently), counting and collecting entry processor exceptions. + */ +public final class DmlPageProcessingErrorResult { + /** Keys that failed to be processed by {@link EntryProcessor} (not due to an exception). */ + private final Object[] errKeys; + + /** Number of entries whose processing resulted into an exception. */ + private final int cnt; + + /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */ + private final SQLException ex; + + /** */ + @SuppressWarnings("ConstantConditions") + public DmlPageProcessingErrorResult(@NotNull Object[] errKeys, SQLException ex, int exCnt) { + errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY); + // When exceptions count must be zero, exceptions chain must be not null, and vice versa. + assert exCnt == 0 ^ ex != null; + + this.errKeys = errKeys; + this.cnt = exCnt; + this.ex = ex; + } + + /** + * @return Number of entries whose processing resulted into an exception. + */ + public int errorCount() { + return cnt; + } + + /** + * @return Error keys. + */ + public Object[] errorKeys() { + return errKeys; + } + + /** + * @return Error. + */ + @Nullable + public SQLException error() { + return ex; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingResult.java new file mode 100644 index 0000000..f2db3a7 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingResult.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.dml; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import java.sql.SQLException; + +/** + * Result of processing an individual page with {@link IgniteCache#invokeAll} including error details, if any. + */ +public final class DmlPageProcessingResult { + /** Number of successfully processed items. */ + private final long cnt; + + /** Keys that failed to be updated or deleted due to concurrent modification of values. */ + private final Object[] errKeys; + + /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */ + private final SQLException ex; + + /** */ + @SuppressWarnings("ConstantConditions") + public DmlPageProcessingResult(long cnt, Object[] errKeys, @Nullable SQLException ex) { + this.cnt = cnt; + this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY); + this.ex = ex; + } + + /** + * @return Number of successfully processed items. + */ + public long count() { + return cnt; + } + + /** + * @return Error keys. + */ + public Object[] errorKeys() { + return errKeys; + } + + /** + * @return Error. + */ + @Nullable public SQLException error() { + return ex; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java new file mode 100644 index 0000000..6621fc2 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.dml; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.h2.util.DateTimeUtils; +import org.h2.util.LocalDateTimeUtils; +import org.h2.value.Value; +import org.h2.value.ValueDate; +import org.h2.value.ValueTime; +import org.h2.value.ValueTimestamp; + +import java.lang.reflect.Array; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Date; + +/** + * DML utility methods. + */ +public class DmlUtils { + /** + * Convert value to column's expected type by means of H2. + * + * @param val Source value. + * @param desc Row descriptor. + * @param expCls Expected value class. + * @param type Expected column type to convert to. + * @return Converted object. + * @throws IgniteCheckedException if failed. + */ + @SuppressWarnings({"ConstantConditions", "SuspiciousSystemArraycopy"}) + public static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type) + throws IgniteCheckedException { + if (val == null) + return null; + + Class<?> currCls = val.getClass(); + + try { + if (val instanceof Date && currCls != Date.class && expCls == Date.class) { + // H2 thinks that java.util.Date is always a Timestamp, while binary marshaller expects + // precise Date instance. Let's satisfy it. + return new Date(((Date) val).getTime()); + } + + // User-given UUID is always serialized by H2 to byte array, so we have to deserialize manually + if (type == Value.UUID && currCls == byte[].class) + return U.unmarshal(desc.context().marshaller(), (byte[]) val, + U.resolveClassLoader(desc.context().gridConfig())); + + if (LocalDateTimeUtils.isJava8DateApiPresent()) { + if (val instanceof Timestamp && LocalDateTimeUtils.isLocalDateTime(expCls)) + return LocalDateTimeUtils.valueToLocalDateTime(ValueTimestamp.get((Timestamp) val)); + + if (val instanceof Date && LocalDateTimeUtils.isLocalDate(expCls)) + return LocalDateTimeUtils.valueToLocalDate(ValueDate.fromDateValue( + DateTimeUtils.dateValueFromDate(((Date) val).getTime()))); + + if (val instanceof Time && LocalDateTimeUtils.isLocalTime(expCls)) + return LocalDateTimeUtils.valueToLocalTime(ValueTime.get((Time) val)); + } + + // We have to convert arrays of reference types manually - + // see https://issues.apache.org/jira/browse/IGNITE-4327 + // Still, we only can convert from Object[] to something more precise. + if (type == Value.ARRAY && currCls != expCls) { + if (currCls != Object[].class) + throw new IgniteCheckedException("Unexpected array type - only conversion from Object[] " + + "is assumed"); + + // Why would otherwise type be Value.ARRAY? + assert expCls.isArray(); + + Object[] curr = (Object[]) val; + + Object newArr = Array.newInstance(expCls.getComponentType(), curr.length); + + System.arraycopy(curr, 0, newArr, 0, curr.length); + + return newArr; + } + + return H2Utils.convert(val, desc, type); + } + catch (Exception e) { + throw new IgniteSQLException("Value conversion failed [from=" + currCls.getName() + ", to=" + + expCls.getName() +']', IgniteQueryErrorCode.CONVERSION_FAILED, e); + } + } + + /** + * Private constructor. + */ + private DmlUtils() { + // No-op. + } +}
