WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2460b953 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2460b953 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2460b953 Branch: refs/heads/ignite-10759-1 Commit: 2460b95336e37dc54e3675ef0b2a78eb0c5e442e Parents: 7358f5a Author: devozerov <voze...@gridgain.com> Authored: Fri Dec 21 16:28:24 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Fri Dec 21 16:28:24 2018 +0300 ---------------------------------------------------------------------- .../internal/processors/query/h2/H2Utils.java | 36 +++++ .../query/h2/database/H2TreeIndex.java | 19 --- .../query/h2/opt/GridH2IndexBase.java | 157 +++---------------- .../query/h2/opt/join/RangeSource.java | 137 ++++++++++++++++ 4 files changed, 196 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2460b953/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java index f4c3ba8..d279660 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java @@ -49,6 +49,9 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory; import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.F; @@ -56,6 +59,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.engine.Session; import org.h2.jdbc.JdbcConnection; +import org.h2.result.Row; import org.h2.result.SortOrder; import org.h2.table.IndexColumn; import org.h2.util.LocalDateTimeUtils; @@ -80,6 +84,8 @@ import org.h2.value.ValueTime; import org.h2.value.ValueTimestamp; import org.h2.value.ValueUuid; +import javax.cache.CacheException; + import static org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME; import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME; import static org.apache.ignite.internal.processors.query.QueryUtils.VER_FIELD_NAME; @@ -613,4 +619,34 @@ public class H2Utils { return qry; } + + /** + * @param row Row. + * @return Row message. + */ + public static GridH2RowMessage toRowMessage(Row row) { + if (row == null) + return null; + + int cols = row.getColumnCount(); + + assert cols > 0 : cols; + + List<GridH2ValueMessage> vals = new ArrayList<>(cols); + + for (int i = 0; i < cols; i++) { + try { + vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i))); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + + GridH2RowMessage res = new GridH2RowMessage(); + + res.values(vals); + + return res; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2460b953/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index b7a0ead..9cdda80 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -450,25 +450,6 @@ public class H2TreeIndex extends H2TreeIndexBase { } /** {@inheritDoc} */ - @Override protected H2Cursor doFind0( - IgniteTree t, - @Nullable SearchRow first, - @Nullable SearchRow last, - BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { - try { - GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, filter, null); - - if (range == null) - range = H2Utils.EMPTY_CURSOR; - - return new H2Cursor(range); - } - catch (IgniteCheckedException e) { - throw DbException.convert(e); - } - } - - /** {@inheritDoc} */ @Override protected BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter(GridH2QueryContext qctx) { if (qctx == null) { assert !cctx.mvccEnabled(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2460b953/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index ea9b881..481dc76 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -30,6 +30,8 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.query.h2.H2Cursor; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource; import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse; @@ -40,6 +42,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMes import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteTree; +import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.CIX2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -61,7 +64,6 @@ import org.h2.table.TableFilter; import org.h2.util.DoneFuture; import org.h2.value.Value; import org.h2.value.ValueNull; -import org.jetbrains.annotations.Nullable; import javax.cache.CacheException; import java.util.ArrayList; @@ -413,7 +415,7 @@ public abstract class GridH2IndexBase extends BaseIndex { // This is the first request containing all the search rows. assert !msg.bounds().isEmpty() : "empty bounds"; - src = new RangeSource(msg.bounds(), msg.segment(), filter(qctx)); + src = new RangeSource(this, msg.bounds(), msg.segment(), filter(qctx)); } else { // This is request to fetch next portion of data. @@ -623,36 +625,6 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * @param row Row. - * @return Row message. - */ - private GridH2RowMessage toRowMessage(Row row) { - if (row == null) - return null; - - int cols = row.getColumnCount(); - - assert cols > 0 : cols; - - List<GridH2ValueMessage> vals = new ArrayList<>(cols); - - for (int i = 0; i < cols; i++) { - try { - vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i))); - } - catch (IgniteCheckedException e) { - throw new CacheException(e); - } - } - - GridH2RowMessage res = new GridH2RowMessage(); - - res.values(vals); - - return res; - } - - /** * @param msg Row message. * @return Search row. */ @@ -1417,100 +1389,32 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * Bounds iterator. + * Find rows for the segments (distributed joins). + * + * @param bounds Bounds. + * @param segment Segment. + * @param filter Filter. + * @return Iterator. */ - private class RangeSource { - /** */ - Iterator<GridH2RowRangeBounds> boundsIter; - - /** */ - int curRangeId = -1; + public Iterator<GridH2Row> findForSegment(GridH2RowRangeBounds bounds, int segment, + BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { + SearchRow first = toSearchRow(bounds.first()); + SearchRow last = toSearchRow(bounds.last()); - /** */ - private final int segment; + IgniteTree t = treeForRead(segment); - /** */ - private final BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter; + try { + GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, filter, null); - /** Iterator. */ - Iterator<GridH2Row> iter = emptyIterator(); + if (range == null) + range = H2Utils.EMPTY_CURSOR; - /** - * @param bounds Bounds. - * @param segment Segment. - * @param filter Filter. - */ - RangeSource(Iterable<GridH2RowRangeBounds> bounds, int segment, BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { - this.segment = segment; - this.filter = filter; - boundsIter = bounds.iterator(); - } + H2Cursor cur = new H2Cursor(range); - /** - * @return {@code true} If there are more rows in this source. - */ - public boolean hasMoreRows() throws IgniteCheckedException { - return boundsIter.hasNext() || iter.hasNext(); + return new CursorIteratorWrapper(cur); } - - /** - * @param maxRows Max allowed rows. - * @return Range. - */ - public GridH2RowRange next(int maxRows) { - assert maxRows > 0 : maxRows; - - for (; ; ) { - if (iter.hasNext()) { - // Here we are getting last rows from previously partially fetched range. - List<GridH2RowMessage> rows = new ArrayList<>(); - - GridH2RowRange nextRange = new GridH2RowRange(); - - nextRange.rangeId(curRangeId); - nextRange.rows(rows); - - do { - rows.add(toRowMessage(iter.next())); - } - while (rows.size() < maxRows && iter.hasNext()); - - if (iter.hasNext()) - nextRange.setPartial(); - else - iter = emptyIterator(); - - return nextRange; - } - - iter = emptyIterator(); - - if (!boundsIter.hasNext()) { - boundsIter = emptyIterator(); - - return null; - } - - GridH2RowRangeBounds bounds = boundsIter.next(); - - curRangeId = bounds.rangeId(); - - SearchRow first = toSearchRow(bounds.first()); - SearchRow last = toSearchRow(bounds.last()); - - IgniteTree t = treeForRead(segment); - - iter = new CursorIteratorWrapper(doFind0(t, first, last, filter)); - - if (!iter.hasNext()) { - // We have to return empty range here. - GridH2RowRange emptyRange = new GridH2RowRange(); - - emptyRange.rangeId(curRangeId); - - return emptyRange; - } - } + catch (IgniteCheckedException e) { + throw DbException.convert(e); } } @@ -1523,21 +1427,6 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * @param t Tree. - * @param first Lower bound. - * @param last Upper bound always inclusive. - * @param filter Filter. - * @return Iterator over rows in given range. - */ - protected H2Cursor doFind0( - IgniteTree t, - @Nullable SearchRow first, - @Nullable SearchRow last, - BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { - throw new UnsupportedOperationException(); - } - - /** * Re-assign column ids after removal of column(s). */ public void refreshColumnIds() { http://git-wip-us.apache.org/repos/asf/ignite/blob/2460b953/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java new file mode 100644 index 0000000..8cce83d --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt.join; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static java.util.Collections.emptyIterator; + +/** + * Bounds iterator. + */ +public class RangeSource { + /** Index. */ + private final GridH2IndexBase idx; + + /** */ + private Iterator<GridH2RowRangeBounds> boundsIter; + + /** */ + private int curRangeId = -1; + + /** */ + private final int segment; + + /** */ + private final BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter; + + /** Iterator. */ + private Iterator<GridH2Row> iter = emptyIterator(); + + /** + * @param bounds Bounds. + * @param segment Segment. + * @param filter Filter. + */ + public RangeSource( + GridH2IndexBase idx, + Iterable<GridH2RowRangeBounds> bounds, + int segment, + BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter + ) { + this.idx = idx; + this.segment = segment; + this.filter = filter; + + boundsIter = bounds.iterator(); + } + + /** + * @return {@code true} If there are more rows in this source. + */ + public boolean hasMoreRows() throws IgniteCheckedException { + return boundsIter.hasNext() || iter.hasNext(); + } + + /** + * @param maxRows Max allowed rows. + * @return Range. + */ + public GridH2RowRange next(int maxRows) { + assert maxRows > 0 : maxRows; + + for (; ; ) { + if (iter.hasNext()) { + // Here we are getting last rows from previously partially fetched range. + List<GridH2RowMessage> rows = new ArrayList<>(); + + GridH2RowRange nextRange = new GridH2RowRange(); + + nextRange.rangeId(curRangeId); + nextRange.rows(rows); + + do { + rows.add(H2Utils.toRowMessage(iter.next())); + } + while (rows.size() < maxRows && iter.hasNext()); + + if (iter.hasNext()) + nextRange.setPartial(); + else + iter = emptyIterator(); + + return nextRange; + } + + iter = emptyIterator(); + + if (!boundsIter.hasNext()) { + boundsIter = emptyIterator(); + + return null; + } + + GridH2RowRangeBounds bounds = boundsIter.next(); + + curRangeId = bounds.rangeId(); + + iter = idx.findForSegment(bounds, segment, filter); + + if (!iter.hasNext()) { + // We have to return empty range here. + GridH2RowRange emptyRange = new GridH2RowRange(); + + emptyRange.rangeId(curRangeId); + + return emptyRange; + } + } + } +}