This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c2ac585097 IGNITE-20662 Sql. Added a microbenchmark to measure script execution performance (#2955) c2ac585097 is described below commit c2ac5850973ae3bfd44b06fc6e3b5880f9f292f1 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Wed Dec 20 20:10:28 2023 +0300 IGNITE-20662 Sql. Added a microbenchmark to measure script execution performance (#2955) --- .../benchmark/AbstractMultiNodeBenchmark.java | 43 +- .../ignite/internal/benchmark/InsertBenchmark.java | 29 +- .../ignite/internal/benchmark/SelectBenchmark.java | 110 ++++- .../benchmark/SqlMultiStatementBenchmark.java | 498 +++++++++++++++++++++ 4 files changed, 645 insertions(+), 35 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java index f13101bfdc..72132d9967 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java @@ -87,25 +87,7 @@ public class AbstractMultiNodeBenchmark { )) ); - var createTableStatement = "CREATE TABLE " + TABLE_NAME + "(\n" - + " ycsb_key int PRIMARY KEY,\n" - + " field1 varchar(100),\n" - + " field2 varchar(100),\n" - + " field3 varchar(100),\n" - + " field4 varchar(100),\n" - + " field5 varchar(100),\n" - + " field6 varchar(100),\n" - + " field7 varchar(100),\n" - + " field8 varchar(100),\n" - + " field9 varchar(100),\n" - + " field10 varchar(100)\n" - + ") WITH primary_zone='" + ZONE_NAME + "'"; - - getAllFromCursor( - await(queryEngine.querySingleAsync( - SqlPropertiesHelper.emptyProperties(), clusterNode.transactions(), null, createTableStatement - )) - ); + createTable(TABLE_NAME); } catch (Throwable th) { nodeTearDown(); @@ -113,6 +95,29 @@ public class AbstractMultiNodeBenchmark { } } + protected void createTable(String tableName) { + var createTableStatement = "CREATE TABLE " + tableName + "(\n" + + " ycsb_key int PRIMARY KEY,\n" + + " field1 varchar(100),\n" + + " field2 varchar(100),\n" + + " field3 varchar(100),\n" + + " field4 varchar(100),\n" + + " field5 varchar(100),\n" + + " field6 varchar(100),\n" + + " field7 varchar(100),\n" + + " field8 varchar(100),\n" + + " field9 varchar(100),\n" + + " field10 varchar(100)\n" + + ") WITH primary_zone='" + ZONE_NAME + "'"; + + getAllFromCursor( + await(clusterNode.queryEngine().querySingleAsync( + SqlPropertiesHelper.emptyProperties(), clusterNode.transactions(), null, createTableStatement + )) + ); + } + + /** * Stops the cluster. * diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java index b77f4931cf..e2a8eeb796 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java @@ -77,6 +77,14 @@ public class InsertBenchmark extends AbstractMultiNodeBenchmark { state.executeQuery(); } + /** + * Benchmark for SQL script insert via embedded client. + */ + @Benchmark + public void sqlInsertScript(SqlState state) { + state.executeScript(); + } + /** * Benchmark for KV insert via embedded client. */ @@ -93,6 +101,14 @@ public class InsertBenchmark extends AbstractMultiNodeBenchmark { state.executeQuery(); } + /** + * Benchmark for JDBC script insert. + */ + @Benchmark + public void jdbcInsertScript(JdbcState state) throws SQLException { + state.executeScript(); + } + /** * Benchmark for SQL insert via thin client. */ @@ -121,7 +137,7 @@ public class InsertBenchmark extends AbstractMultiNodeBenchmark { } /** - * Benchmark state for {@link #sqlInsert(SqlState)}. + * Benchmark state for {@link #sqlInsert(SqlState)} and {@link #sqlInsertScript(SqlState)}. * * <p>Holds {@link Session} and {@link Statement}. */ @@ -159,6 +175,10 @@ public class InsertBenchmark extends AbstractMultiNodeBenchmark { // NO-OP } } + + void executeScript() { + session.executeScript(statement.query(), id++); + } } /** @@ -204,7 +224,7 @@ public class InsertBenchmark extends AbstractMultiNodeBenchmark { } /** - * Benchmark state for {@link #jdbcInsert(JdbcState)}. + * Benchmark state for {@link #jdbcInsert(JdbcState)} and {@link #jdbcInsertScript(JdbcState)}. * * <p>Holds {@link Connection} and {@link PreparedStatement}. */ @@ -241,6 +261,11 @@ public class InsertBenchmark extends AbstractMultiNodeBenchmark { stmt.setInt(1, id++); stmt.executeUpdate(); } + + void executeScript() throws SQLException { + stmt.setInt(1, id++); + stmt.execute(); + } } /** diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java index f502821205..39f30f17df 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java @@ -23,9 +23,17 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Iterator; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; +import org.apache.ignite.internal.sql.engine.InternalSqlRow; +import org.apache.ignite.internal.sql.engine.QueryProcessor; +import org.apache.ignite.internal.sql.engine.property.SqlProperties; +import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper; +import org.apache.ignite.internal.util.AsyncCursor.BatchedResult; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.Session; @@ -45,6 +53,7 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; @@ -95,19 +104,39 @@ public class SelectBenchmark extends AbstractMultiNodeBenchmark { * Benchmark for SQL select via embedded client. */ @Benchmark - public void sqlGet(SqlState sqlState) { + public void sqlGet(SqlState sqlState, Blackhole bh) { try (var rs = sqlState.sql(SELECT_ALL_FROM_USERTABLE, random.nextInt(TABLE_SIZE))) { - rs.next(); + bh.consume(rs.next()); } } + /** + * Benchmark for SQL select via embedded client using internal API. + */ + @Benchmark + public void sqlGetInternal(SqlInternalApiState sqlInternalApiState, Blackhole bh) { + Iterator<InternalSqlRow> res = sqlInternalApiState.query(SELECT_ALL_FROM_USERTABLE, random.nextInt(TABLE_SIZE)); + + bh.consume(res.next()); + } + + /** + * Benchmark for SQL script select via embedded client using internal API. + */ + @Benchmark + public void sqlGetInternalScript(SqlInternalApiState sqlInternalApiState, Blackhole bh) { + Iterator<InternalSqlRow> res = sqlInternalApiState.script(SELECT_ALL_FROM_USERTABLE, random.nextInt(TABLE_SIZE)); + + bh.consume(res.next()); + } + /** * Benchmark for SQL select via thin client. */ @Benchmark - public void sqlThinGet(SqlThinState sqlState) { + public void sqlThinGet(SqlThinState sqlState, Blackhole bh) { try (var rs = sqlState.sql(SELECT_ALL_FROM_USERTABLE, random.nextInt(TABLE_SIZE))) { - rs.next(); + bh.consume(rs.next()); } } @@ -115,10 +144,23 @@ public class SelectBenchmark extends AbstractMultiNodeBenchmark { * Benchmark for JDBC get. */ @Benchmark - public void jdbcGet(JdbcState state) throws SQLException { + public void jdbcGet(JdbcState state, Blackhole bh) throws SQLException { state.stmt.setInt(1, random.nextInt(TABLE_SIZE)); try (ResultSet r = state.stmt.executeQuery()) { - r.next(); + bh.consume(r.next()); + } + } + + /** + * Benchmark for JDBC script get. + */ + @Benchmark + public void jdbcGetScript(JdbcState state, Blackhole bh) throws SQLException { + state.stmt.setInt(1, random.nextInt(TABLE_SIZE)); + state.stmt.execute(); + + try (ResultSet r = state.stmt.getResultSet()) { + bh.consume(r.next()); } } @@ -126,16 +168,18 @@ public class SelectBenchmark extends AbstractMultiNodeBenchmark { * Benchmark for KV get via embedded client. */ @Benchmark - public void kvGet() { - keyValueView.get(null, Tuple.create().set("ycsb_key", random.nextInt(TABLE_SIZE))); + public void kvGet(Blackhole bh) { + Tuple val = keyValueView.get(null, Tuple.create().set("ycsb_key", random.nextInt(TABLE_SIZE))); + bh.consume(val); } /** * Benchmark for KV get via thin client. */ @Benchmark - public void kvThinGet(KvThinState kvState) { - kvState.kvView().get(null, Tuple.create().set("ycsb_key", random.nextInt(TABLE_SIZE))); + public void kvThinGet(KvThinState kvState, Blackhole bh) { + Tuple val = kvState.kvView().get(null, Tuple.create().set("ycsb_key", random.nextInt(TABLE_SIZE))); + bh.consume(val); } /** @@ -150,7 +194,7 @@ public class SelectBenchmark extends AbstractMultiNodeBenchmark { } /** - * Benchmark state for {@link #sqlGet(SqlState)}. + * Benchmark state for {@link #sqlGet(SqlState, Blackhole)}. * * <p>Holds {@link Session}. */ @@ -172,7 +216,45 @@ public class SelectBenchmark extends AbstractMultiNodeBenchmark { } /** - * Benchmark state for {@link #sqlThinGet(SqlThinState)}. + * Benchmark state for {@link #sqlGetInternalScript(SqlInternalApiState, Blackhole)} and + * {@link #sqlGetInternal(SqlInternalApiState, Blackhole)}. + */ + @State(Scope.Benchmark) + public static class SqlInternalApiState { + private final SqlProperties properties = SqlPropertiesHelper.emptyProperties(); + private final QueryProcessor queryProc = clusterNode.queryEngine(); + private int pageSize; + + /** Initializes session. */ + @Setup + public void setUp() throws Exception { + Session session = clusterNode.sql().createSession(); + + pageSize = session.defaultPageSize(); + + IgniteUtils.closeAll(session); + } + + private Iterator<InternalSqlRow> query(String sql, Object... args) { + return handleFirstBatch(queryProc.querySingleAsync(properties, clusterNode.transactions(), null, sql, args)); + } + + private Iterator<InternalSqlRow> script(String sql, Object... args) { + return handleFirstBatch(queryProc.queryScriptAsync(properties, clusterNode.transactions(), null, sql, args)); + } + + private Iterator<InternalSqlRow> handleFirstBatch(CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFut) { + AsyncSqlCursor<InternalSqlRow> cursor = cursorFut.join(); + BatchedResult<InternalSqlRow> res = cursor.requestNextAsync(pageSize).join(); + + cursor.closeAsync(); + + return res.items().iterator(); + } + } + + /** + * Benchmark state for {@link #sqlThinGet(SqlThinState, Blackhole)}. * * <p>Holds {@link IgniteClient} and {@link Session}. */ @@ -207,7 +289,7 @@ public class SelectBenchmark extends AbstractMultiNodeBenchmark { } /** - * Benchmark state for {@link #jdbcGet(JdbcState)}. + * Benchmark state for {@link #jdbcGet(JdbcState, Blackhole)}. * * <p>Holds {@link Connection} and {@link PreparedStatement}. */ @@ -239,7 +321,7 @@ public class SelectBenchmark extends AbstractMultiNodeBenchmark { } /** - * Benchmark state for {@link #kvThinGet(KvThinState)}. + * Benchmark state for {@link #kvThinGet(KvThinState, Blackhole)}. * * <p>Holds {@link IgniteClient} and {@link KeyValueView} for the table. */ diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java new file mode 100644 index 0000000000..2e27a73199 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmark; + +import static java.util.stream.Collectors.joining; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.IntStream; +import org.apache.ignite.internal.lang.IgniteStringBuilder; +import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; +import org.apache.ignite.internal.sql.engine.InternalSqlRow; +import org.apache.ignite.internal.sql.engine.QueryProcessor; +import org.apache.ignite.internal.sql.engine.property.SqlProperties; +import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper; +import org.apache.ignite.internal.util.AsyncCursor.BatchedResult; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.sql.Session; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.tx.IgniteTransactions; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmark measures the time for sequential execution of different sets of statements and the time + * for executing the same set of statements, but running them as a script. + * + * <p>Include the following cases: + * <ol> + * <li>{@code INSERT} the same key into different tables.</li> + * <li>{@code SELECT COUNT(*)} from different tables.</li> + * <li>{@code SELECT} using single key from multiple tables.</li> + * <li>Multiple {@code SELECT} by key from one table.</li> + * </ol> + */ +@State(Scope.Benchmark) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 2) +@Measurement(iterations = 20, time = 2) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SqlMultiStatementBenchmark extends AbstractMultiNodeBenchmark { + private static final int TABLE_SIZE = 10_000; + + @Param({"1", "2"}) + private int clusterSize; + + @Param({"1", "2", "4"}) + private static int statementsCount; + + /** Creates tables. */ + @Setup + public void createTables() { + for (int i = 0; i < statementsCount; i++) { + createTable("T" + i); + } + } + + /** Benchmark for sequential {@code INSERT}. */ + @Benchmark + public void insert(InsertState state, Blackhole bh) { + state.executeQuery(bh); + } + + /** Benchmark for script {@code INSERT}. */ + @Benchmark + public void insertScript(InsertState state, Blackhole bh) { + state.executeScript(bh); + } + + /** Benchmark for sequential {@code COUNT}. */ + @Benchmark + public void count(CountState state, Blackhole bh) { + state.executeQuery(bh); + } + + /** Benchmark for script {@code COUNT}. */ + @Benchmark + public void countScript(CountState state, Blackhole bh) { + state.executeScript(bh); + } + + /** Benchmark for sequential single key {@code SELECT}. */ + @Benchmark + public void selectMultipleTables(SelectSingleKeyMultipleTablesState state, Blackhole bh) { + state.executeQuery(bh); + } + + /** Benchmark for script single key {@code SELECT}. */ + @Benchmark + public void selectMultipleTablesScript(SelectSingleKeyMultipleTablesState state, Blackhole bh) { + state.executeScript(bh); + } + + /** Benchmark for sequential single table {@code SELECT}. */ + @Benchmark + public void selectMultipleKeys(SelectMultipleKeysSingleTableState state, Blackhole bh) { + state.executeQuery(bh); + } + + /** Benchmark for script single table {@code SELECT}. */ + @Benchmark + public void selectMultipleKeysScript(SelectMultipleKeysSingleTableState state, Blackhole bh) { + state.executeScript(bh); + } + + /** + * Benchmark state for {@link #insert(InsertState, Blackhole)} and + * {@link #insertScript(InsertState, Blackhole)} benchmarks. + */ + @State(Scope.Benchmark) + public static class InsertState { + private QueryRunner queryRunner; + private Parameters parameters; + private Session session; + + /** Generates required statements.*/ + @Setup + public void setUp() { + session = clusterNode.sql().createSession(); + parameters = new Parameters(statementsCount, n -> createInsertStatement("T" + n)); + queryRunner = new QueryRunner( + clusterNode.queryEngine(), + clusterNode.transactions(), + session.defaultPageSize() + ); + } + + @TearDown + public void closeSession() throws Exception { + IgniteUtils.closeAll(session); + } + + private int id = 0; + + void executeQuery(Blackhole bh) { + int id0 = id++; + + for (int i = 0; i < statementsCount; i++) { + Iterator<?> res = queryRunner.execQuery(parameters.statements.get(i), id0); + + while (res.hasNext()) { + bh.consume(res.next()); + } + } + } + + void executeScript(Blackhole bh) { + int id0 = id++; + + for (int i = 0; i < statementsCount; i++) { + parameters.scriptArgs[i] = id0; + } + + Iterator<InternalSqlRow> res = queryRunner.execScript(parameters.script, parameters.scriptArgs); + + while (res.hasNext()) { + bh.consume(res.next()); + } + } + + private static String createInsertStatement(String tableName) { + String insertQueryTemplate = "insert into {}({}, {}) values(?, {})"; + + String fieldsQ = IntStream.range(1, 11).mapToObj(i -> "field" + i).collect(joining(",")); + String valQ = IntStream.range(1, 11).mapToObj(i -> "'" + FIELD_VAL + "'").collect(joining(",")); + + return format(insertQueryTemplate, tableName, "ycsb_key", fieldsQ, valQ); + } + } + + /** + * Benchmark state for {@link #count(CountState, Blackhole)} and + * {@link #countScript(CountState, Blackhole)} benchmarks. + */ + @State(Scope.Benchmark) + public static class CountState { + private QueryRunner queryRunner; + private Parameters parameters; + private Session session; + + /** Generates required statements.*/ + @Setup + public void setUp() { + fillTables(statementsCount); + + session = clusterNode.sql().createSession(); + parameters = new Parameters(statementsCount, n -> format("select count(*) from T{}", n)); + queryRunner = new QueryRunner( + clusterNode.queryEngine(), + clusterNode.transactions(), + session.defaultPageSize() + ); + } + + @TearDown + public void closeSession() throws Exception { + IgniteUtils.closeAll(session); + } + + void executeQuery(Blackhole bh) { + for (int i = 0; i < statementsCount; i++) { + Iterator<?> res = queryRunner.execQuery(parameters.statements.get(i)); + + while (res.hasNext()) { + bh.consume(res.next()); + } + } + } + + void executeScript(Blackhole bh) { + Iterator<?> res = queryRunner.execScript(parameters.script); + + while (res.hasNext()) { + bh.consume(res.next()); + } + } + } + + /** + * Benchmark state for {@link #selectMultipleTables(SelectSingleKeyMultipleTablesState, Blackhole)} and + * {@link #selectMultipleTablesScript(SelectSingleKeyMultipleTablesState, Blackhole)} benchmarks. + */ + @State(Scope.Benchmark) + public static class SelectSingleKeyMultipleTablesState { + private final Random random = new Random(); + private QueryRunner queryRunner; + private Parameters parameters; + private Session session; + + /** Generates required statements.*/ + @Setup + public void setUp() { + fillTables(statementsCount); + + session = clusterNode.sql().createSession(); + parameters = new Parameters(statementsCount, n -> format("select * from T{} where ycsb_key=?", n)); + queryRunner = new QueryRunner( + clusterNode.queryEngine(), + clusterNode.transactions(), + session.defaultPageSize() + ); + } + + @TearDown + public void closeSession() throws Exception { + IgniteUtils.closeAll(session); + } + + void executeQuery(Blackhole bh) { + int key = random.nextInt(TABLE_SIZE); + + for (int i = 0; i < statementsCount; i++) { + Iterator<?> res = queryRunner.execQuery(parameters.statements.get(i), key); + + while (res.hasNext()) { + bh.consume(res.next()); + } + } + } + + void executeScript(Blackhole bh) { + int key = random.nextInt(TABLE_SIZE); + + for (int i = 0; i < statementsCount; i++) { + parameters.scriptArgs[i] = key; + } + + Iterator<?> res = queryRunner.execScript(parameters.script, parameters.scriptArgs); + + while (res.hasNext()) { + bh.consume(res.next()); + } + } + } + + /** + * Benchmark state for {@link #selectMultipleKeys(SelectMultipleKeysSingleTableState, Blackhole)} and + * {@link #selectMultipleKeysScript(SelectMultipleKeysSingleTableState, Blackhole)} benchmarks. + */ + @State(Scope.Benchmark) + public static class SelectMultipleKeysSingleTableState { + private final Random random = new Random(); + private QueryRunner queryRunner; + private Parameters parameters; + private Session session; + + /** Generates required statements.*/ + @Setup + public void setUp() { + fillTables(statementsCount); + + session = clusterNode.sql().createSession(); + parameters = new Parameters(statementsCount, ignore -> "select * from T0 where ycsb_key=?"); + queryRunner = new QueryRunner( + clusterNode.queryEngine(), + clusterNode.transactions(), + session.defaultPageSize() + ); + } + + @TearDown + public void closeSession() throws Exception { + IgniteUtils.closeAll(session); + } + + void executeQuery(Blackhole bh) { + for (int i = 0; i < statementsCount; i++) { + Iterator<?> res = queryRunner.execQuery(parameters.statements.get(i), random.nextInt(TABLE_SIZE)); + + while (res.hasNext()) { + bh.consume(res.next()); + } + } + } + + void executeScript(Blackhole bh) { + for (int i = 0; i < statementsCount; i++) { + parameters.scriptArgs[i] = random.nextInt(TABLE_SIZE); + } + + Iterator<?> res = queryRunner.execScript(parameters.script, parameters.scriptArgs); + + while (res.hasNext()) { + bh.consume(res.next()); + } + } + } + + private static class Parameters { + private final List<String> statements = new ArrayList<>(); + private final String script; + private final Object[] scriptArgs; + + private Parameters(int count, Function<Integer, String> statementGenerator) { + IgniteStringBuilder buf = new IgniteStringBuilder(); + + for (int i = 0; i < count; i++) { + String statement = statementGenerator.apply(i); + statements.add(statement); + buf.app(statement).app(';'); + } + + script = buf.toString(); + scriptArgs = new Object[count]; + } + } + + /** Executes SQL query/script using internal API. */ + private static class QueryRunner { + private final SqlProperties props = SqlPropertiesHelper.emptyProperties(); + private final QueryProcessor queryProcessor; + private final IgniteTransactions transactions; + private final int pageSize; + + QueryRunner(QueryProcessor queryProcessor, IgniteTransactions transactions, int pageSize) { + this.queryProcessor = queryProcessor; + this.transactions = transactions; + this.pageSize = pageSize; + } + + Iterator<InternalSqlRow> execQuery(String sql, Object ... args) { + AsyncSqlCursor<InternalSqlRow> cursor = + queryProcessor.querySingleAsync(props, transactions, null, sql, args).join(); + + return new InternalResultsIterator(cursor, pageSize); + } + + Iterator<InternalSqlRow> execScript(String sql, Object ... args) { + AsyncSqlCursor<InternalSqlRow> cursor = + queryProcessor.queryScriptAsync(props, transactions, null, sql, args).join(); + + return new InternalResultsIterator(cursor, pageSize); + } + + private static class InternalResultsIterator implements Iterator<InternalSqlRow> { + private final int fetchSize; + private BatchedResult<InternalSqlRow> res; + private Iterator<InternalSqlRow> items; + private AsyncSqlCursor<InternalSqlRow> cursor; + + InternalResultsIterator(AsyncSqlCursor<InternalSqlRow> cursor, int fetchSize) { + this.cursor = cursor; + this.fetchSize = fetchSize; + this.items = fetchNext(); + } + + @Override + public boolean hasNext() { + if (items.hasNext()) { + return true; + } + + if (res.hasMore()) { + items = fetchNext(); + } else if (cursor.hasNextResult()) { + cursor.closeAsync(); + + cursor = cursor.nextResult().join(); + items = fetchNext(); + } + + boolean hasNext = items.hasNext(); + + if (!hasNext) { + cursor.closeAsync(); + } + + return hasNext; + } + + @Override + public InternalSqlRow next() { + return items.next(); + } + + private Iterator<InternalSqlRow> fetchNext() { + res = cursor.requestNextAsync(fetchSize).join(); + + return res.items().iterator(); + } + } + } + + private static void fillTables(int tablesCount) { + int id = 0; + + Map<Tuple, Tuple> data = new HashMap<>(); + + for (int i = 0; i < TABLE_SIZE; i++) { + Tuple t = Tuple.create(); + for (int j = 1; j <= 10; j++) { + t.set("field" + j, FIELD_VAL); + } + + data.put(Tuple.create().set("ycsb_key", id++), t); + } + + for (int i = 0; i < tablesCount; i++) { + clusterNode.tables().table("T" + i).keyValueView().putAll(null, data); + } + } + + /** + * Benchmark's entry point. + */ + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(".*" + SqlMultiStatementBenchmark.class.getSimpleName() + ".*") + .build(); + + new Runner(opt).run(); + } + + @Override + protected int nodes() { + return clusterSize; + } +}