This is an automated email from the ASF dual-hosted git repository. tledkov pushed a commit to branch sql-calcite in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push: new 7a09246 IGNITE-13915 Calcite improvements. Fix fragment ID on mapping, extend tests coverage, use both client and server for starting queries (#8629) 7a09246 is described below commit 7a09246b4cdc80b568c4eec838dcd40492f39822 Author: zstan <stanilov...@gmail.com> AuthorDate: Fri Jan 15 12:40:09 2021 +0300 IGNITE-13915 Calcite improvements. Fix fragment ID on mapping, extend tests coverage, use both client and server for starting queries (#8629) --- .../processors/query/calcite/exec/rel/Inbox.java | 5 + .../processors/query/calcite/prepare/Fragment.java | 10 -- .../query/calcite/prepare/QueryTemplate.java | 2 +- .../query/calcite/CalciteQueryProcessorTest.java | 160 +++++++++++---------- 4 files changed, 94 insertions(+), 83 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java index c261b79..7bdc0ac 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java @@ -108,6 +108,9 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl * @param comp Optional comparator for merge exchange. */ public void init(ExecutionContext<Row> ctx, RelDataType rowType, Collection<UUID> srcNodeIds, @Nullable Comparator<Row> comp) { + assert context().fragmentId() == ctx.fragmentId() : "different fragments unsupported: previous=" + context().fragmentId() + + " current=" + ctx.fragmentId(); + // It's important to set proper context here because // the one, that is created on a first message // received doesn't have all context variables in place. @@ -187,6 +190,8 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl /** */ private void doPush() { try { + checkState(); + push(); } catch (Exception e) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java index 90a5a5f..9db3bf1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java @@ -67,16 +67,6 @@ public class Fragment { this(id, root, remotes, null, null); } - /** - * @param id Fragment id. - * @param root Root node of the fragment. - * @param remotes Remote sources of the fragment. - * @param rootSer Root serialized representation. - */ - public Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer) { - this(id, root, remotes, rootSer, null); - } - /** */ Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer, @Nullable FragmentMapping mapping) { this.id = id; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java index 583385c..5689994 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java @@ -119,7 +119,7 @@ public class QueryTemplate { sender = new IgniteSender(sender.getCluster(), sender.getTraitSet(), sender.getInput(), sender.exchangeId(), newTargetId, sender.distribution()); - fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes(), fragment0.serialized()); + fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes()); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 3f7e01a..4fc05bb 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; @@ -50,17 +52,17 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_E @WithSystemProperty(key = "calcite.debug", value = "false") public class CalciteQueryProcessorTest extends GridCommonAbstractTest { /** */ - private static IgniteEx ignite; + private static IgniteEx client; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { startGrids(5); - ignite = startClientGrid(); + client = startClientGrid(); } /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { + @Override protected void afterTest() { for (Ignite ign : G.allGrids()) { for (String cacheName : ign.cacheNames()) ign.destroyCache(cacheName); @@ -75,21 +77,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { /** */ @Test public void testCountWithJoin() throws Exception { - IgniteCache<Integer, RISK> RISK = ignite.getOrCreateCache(new CacheConfiguration<Integer, RISK>() + IgniteCache<Integer, RISK> RISK = client.getOrCreateCache(new CacheConfiguration<Integer, RISK>() .setName("RISK") .setSqlSchema("PUBLIC") - .setQueryEntities(F.asList(new QueryEntity(Integer.class, RISK.class).setTableName("RISK"))) + .setQueryEntities(F.asList(new QueryEntity(Integer.class, RISK.class).setTableName("RISK") + .setKeyFields(new HashSet<>(Arrays.asList("TRADEID", "TRADEVER"))))) .setBackups(1) ); - IgniteCache<Integer, TRADE> TRADE = ignite.getOrCreateCache(new CacheConfiguration<Integer, TRADE>() + IgniteCache<Integer, TRADE> TRADE = client.getOrCreateCache(new CacheConfiguration<Integer, TRADE>() .setName("TRADE") .setSqlSchema("PUBLIC") - .setQueryEntities(F.asList(new QueryEntity(Integer.class, TRADE.class).setTableName("TRADE"))) + .setQueryEntities(F.asList(new QueryEntity(Integer.class, TRADE.class).setTableName("TRADE") + .setKeyFields(new HashSet<>(Arrays.asList("TRADEID", "TRADEVER"))))) .setBackups(1) ); - IgniteCache<Integer, BATCH> BATCH = ignite.getOrCreateCache(new CacheConfiguration<Integer, BATCH>() + IgniteCache<Integer, BATCH> BATCH = client.getOrCreateCache(new CacheConfiguration<Integer, BATCH>() .setName("BATCH") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, BATCH.class).setTableName("BATCH"))) @@ -115,47 +119,51 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { awaitPartitionMapExchange(true, true, null); - QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class); - // TODO: https://issues.apache.org/jira/browse/IGNITE-13849 // we have a problem with serialization/deserialization of MergeJoin - List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", - "SELECT /*+ DISABLE_RULE('MergeJoinConverter') */ count(*)" + - " FROM RISK R," + - " TRADE T," + - " BATCH B " + - "WHERE R.BATCHKEY = B.BATCHKEY " + - "AND R.TRADEID = T.TRADEID " + - "AND R.TRADEVER = T.TRADEVER " + - "AND T.BOOK = 'BOOK' " + - "AND B.IS = TRUE"); + String sqlCalc = "SELECT /*+ DISABLE_RULE('MergeJoinConverter') */ count(*)" + + " FROM RISK R," + + " TRADE T," + + " BATCH B " + + "WHERE R.BATCHKEY = B.BATCHKEY " + + "AND R.TRADEID = T.TRADEID " + + "AND R.TRADEVER = T.TRADEVER " + + "AND T.BOOK = 'BOOK' " + + "AND B.LS = TRUE"; + + // loop for test execution. + for (int i = 0; i < 10; i++) { + List<List<?>> resLoc = sql(sqlCalc); + assertEquals(40L, resLoc.get(0).get(0)); + } - List<List<?>> res = query.get(0).getAll(); + //calcite + List<List<?>> res1 = sql(sqlCalc); - assertEquals(1, res.size()); - assertEquals(1, res.get(0).size()); - assertEquals(40L, res.get(0).get(0)); + assertEquals(1, res1.size()); + assertEquals(1, res1.get(0).size()); + assertEquals(40L, res1.get(0).get(0)); } /** */ public static class RISK { /** */ @QuerySqlField - public Integer BATCHKEY; + public Integer batchKey; /** */ @QuerySqlField - public Integer TRADEID; + public Integer tradeId; /** */ @QuerySqlField - public Integer TRADEVER; + public Integer tradeVer; /** */ public RISK(Integer in) { - BATCHKEY = in; - TRADEID = in; - TRADEVER = in; + batchKey = in; + tradeId = in; + tradeVer = in; } } @@ -163,21 +171,21 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { public static class TRADE { /** */ @QuerySqlField - public Integer TRADEID; + public Integer tradeId; /** */ @QuerySqlField - public Integer TRADEVER; + public Integer tradeVer; /** */ @QuerySqlField - public String BOOK; + public String book; /** */ public TRADE(Integer in) { - TRADEID = in; - TRADEVER = in; - BOOK = (in & 1) != 0 ? "BOOK" : ""; + tradeId = in; + tradeVer = in; + book = (in & 1) != 0 ? "BOOK" : ""; } } @@ -185,37 +193,37 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { public static class BATCH { /** */ @QuerySqlField - public Integer BATCHKEY; + public Integer batchKey; /** */ @QuerySqlField - public Boolean IS; + public Boolean ls; /** */ public BATCH(Integer in) { - BATCHKEY = in; - IS = (in & 1) != 0; + batchKey = in; + ls = (in & 1) != 0; } } /** */ @Test public void unionAll() throws Exception { - IgniteCache<Integer, Employer> employer1 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> employer1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("employer1") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER1"))) .setBackups(1) ); - IgniteCache<Integer, Employer> employer2 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> employer2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("employer2") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER2"))) .setBackups(2) ); - IgniteCache<Integer, Employer> employer3 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> employer3 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("employer3") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER3"))) @@ -231,39 +239,33 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { awaitPartitionMapExchange(true, true, null); - QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class); - - List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", - "SELECT * FROM employer1 " + - "UNION ALL " + - "SELECT * FROM employer2 " + - "UNION ALL " + - "SELECT * FROM employer3 "); - - assertEquals(1, query.size()); + List<List<?>> rows = sql("SELECT * FROM employer1 " + + "UNION ALL " + + "SELECT * FROM employer2 " + + "UNION ALL " + + "SELECT * FROM employer3 "); - List<List<?>> rows = query.get(0).getAll(); assertEquals(6, rows.size()); } /** */ @Test public void union() throws Exception { - IgniteCache<Integer, Employer> employer1 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> employer1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("employer1") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER1"))) .setBackups(1) ); - IgniteCache<Integer, Employer> employer2 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> employer2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("employer2") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER2"))) .setBackups(2) ); - IgniteCache<Integer, Employer> employer3 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> employer3 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("employer3") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER3"))) @@ -296,14 +298,14 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { /** */ private void populateTables() throws InterruptedException { - IgniteCache<Integer, Employer> orders = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> orders = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("orders") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("orders"))) .setBackups(2) ); - IgniteCache<Integer, Employer> account = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> account = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("account") .setSqlSchema("PUBLIC") .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("account"))) @@ -405,7 +407,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { populateTables(); List<List<?>> rows = sql( - "SELECT distinct(name) FROM Orders o WHERE name IN (" + + "SELECT distinct(name) FROM Orders o WHERE name IN (" + " SELECT name" + " FROM Account)"); @@ -453,7 +455,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { populateTables(); List<List<?>> rows = sql( - "SELECT name FROM Orders o WHERE EXISTS (" + + "SELECT name FROM Orders o WHERE EXISTS (" + " SELECT 1" + " FROM Account a" + " WHERE o.name = a.name)"); @@ -468,7 +470,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { populateTables(); List<List<?>> rows = sql( - "EXPLAIN PLAN FOR SELECT name FROM Orders o WHERE NOT EXISTS (" + + "EXPLAIN PLAN FOR SELECT name FROM Orders o WHERE NOT EXISTS (" + " SELECT 1" + " FROM Account a" + " WHERE o.name = a.name)"); @@ -484,7 +486,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { assertEquals(3, rows.size()); rows = sql( - "SELECT distinct(name) FROM Orders o WHERE NOT EXISTS (" + + "SELECT distinct(name) FROM Orders o WHERE NOT EXISTS (" + " SELECT name" + " FROM Account a" + " WHERE o.name = a.name)"); @@ -517,7 +519,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { /** */ @Test public void aggregate() throws Exception { - IgniteCache<Integer, Employer> employer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>() + IgniteCache<Integer, Employer> employer = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>() .setName("employer") .setSqlSchema("PUBLIC") .setIndexedTypes(Integer.class, Employer.class) @@ -694,7 +696,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { /** */ @Test public void testInsertUpdateDeleteNonPrimitiveKey() throws Exception { - IgniteCache<Key, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Key, Developer>() + IgniteCache<Key, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Key, Developer>() .setName("developer") .setSqlSchema("PUBLIC") .setIndexedTypes(Key.class, Developer.class) @@ -766,7 +768,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { /** for test purpose only. */ public void testThroughput() { - IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>() + IgniteCache<Integer, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Integer, Developer>() .setCacheMode(CacheMode.REPLICATED) .setName("developer") .setSqlSchema("PUBLIC") @@ -785,7 +787,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { developer.put(i, new Developer("Name" + i, prId)); } - QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class); + QueryEngine engine = Commons.lookupComponent(client.context(), QueryEngine.class); // warmup for (int i = 0; i < numIterations; i++) { @@ -802,13 +804,13 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { // warmup for (int i = 0; i < numIterations; i++) { - List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false); + List<FieldsQueryCursor<List<?>>> query = client.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false); query.get(0).getAll(); } start = System.currentTimeMillis(); for (int i = 0; i < numIterations; i++) { - List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false); + List<FieldsQueryCursor<List<?>>> query = client.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false); query.get(0).getAll(); } System.out.println("H2 duration = " + (System.currentTimeMillis() - start)); @@ -816,11 +818,25 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { /** */ private List<List<?>> sql(String sql) { - QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class); + QueryEngine engineSrv = Commons.lookupComponent(grid(1).context(), QueryEngine.class); + + assertTrue(client.configuration().isClientMode()); + + QueryEngine engineCli = Commons.lookupComponent(client.context(), QueryEngine.class); - List<FieldsQueryCursor<List<?>>> cursors = engine.query(null, "PUBLIC", sql); + List<FieldsQueryCursor<List<?>>> cursorsSrv = engineSrv.query(null, "PUBLIC", sql); + + List<FieldsQueryCursor<List<?>>> cursorsCli = engineCli.query(null, "PUBLIC", sql); + + List<List<?>> allSrv; + + try (QueryCursor srvCursor = cursorsSrv.get(0); QueryCursor cliCursor = cursorsCli.get(0)) { + allSrv = srvCursor.getAll(); + + assertEquals(allSrv.size(), cliCursor.getAll().size()); + } - return cursors.get(0).getAll(); + return allSrv; } /** */