alex-plekhanov commented on code in PR #12936:
URL: https://github.com/apache/ignite/pull/12936#discussion_r3021435846
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java:
##########
@@ -205,6 +208,13 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** */
private final Map<String, FragmentPlan> fragmentPlanCache = new
GridBoundedConcurrentLinkedHashMap<>(1024);
+ /**
+ * Transaction modified entries holder.
+ *
+ * @see TransactionConfiguration#isTxAwareQueriesEnabled()
+ */
+ private TxAwareModifiedEntriesHolder modifiedEntriesHolder;
Review Comment:
Why not use just static `ThreadLocal` in `ExecutionContext`? Looks like we
don't need this variable outside of execution context.
When execution context created we can check if qryTxEntries == null and get
qryTxEntries from thread local.
It's much simplier (just about 5-10 lines of code)
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java:
##########
@@ -577,7 +591,8 @@ private FieldsQueryCursor<List<?>>
executeDdl(RootQuery<Row> qry, DdlPlan plan)
/** */
private ListFieldsQueryCursor<?> mapAndExecutePlan(
RootQuery<Row> qry,
- MultiStepPlan plan
+ MultiStepPlan plan,
+ TxAwareModifiedEntriesHolder mofiedEntriesHolder
Review Comment:
No need to pass this parameter, it's a non static method and always passed
as `this.modifiedEntriesHolder`
Also typo in `mofiedEntriesHolder`
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ refResults.add(List.of(0, Integer.toString(0)));
+ // Insert outside tx.
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id, name)
VALUES (?, ?)").setArgs(0, 0)).getAll();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ for (int i = 1; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ // Simple select without udf.
+ List<List<?>> selectResult = cache
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+
+ assertThat(selectResult, equalTo(refResults));
+
+ // Select with udf.
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Select with nested udf.
+ res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Udf participate in dml.
+ res = cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (100, nameAsStr(1))")).getAll();
+
+ res = cache.query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY
WHERE id = 100")).getAll();
+
+ assertEquals("1", res.get(0).get(0));
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ // A bit different case of udf.
+ List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT
name(?) AS result").setArgs(i)).getAll();
+
+ List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
+
+ assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
+ }
+
+ tx.commit();
+ }
+ }
+
+ /** */
+ @Test
+ public void testIsolationCorrectnessWithUdf() throws
IgniteCheckedException {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ /*The pool size should be greater than the maximum number of
concurrent queries initiated by UDFs*/
+ IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ for (int iter = 0; iter < 10; ++iter) {
+ try (Transaction tx =
client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ List<List<Object>> refResults = new ArrayList<>();
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO
PUBLIC.CITY(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ tx.rollback();
+ }
+ }
+ }, 10, "calcite-tx-with-udf");
+
+ fut.get(30_000);
+ }
+
+ /** */
+ private CacheConfiguration<Integer, Object> cacheConfig() {
+ var entity = new QueryEntity()
+ .setTableName("CITY")
+ .setKeyType(Integer.class.getName())
+ .setValueType(City.class.getName())
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("name", String.class.getName(), null)
+ .setKeyFieldName("id");
+
+ return new CacheConfiguration<Integer, Object>(DEFAULT_CACHE_NAME)
+ .setCacheMode(PARTITIONED)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setSqlSchema("PUBLIC")
+ .setSqlFunctionClasses(InnerSqlFunctionsLibrary.class)
+ .setQueryEntities(List.of(entity));
+ }
+
+ /** */
+ public static class InnerSqlFunctionsLibrary {
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customTableFunc() {
+ Ignite ignite = Ignition.localIgnite();
+
+ return ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+ }
+
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customNestedTableFunc() {
+ Ignite ignite = Ignition.localIgnite();
+
+ Object res = ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT customTableFuncInner() AS
result"))
+ .getAll().get(0).get(0);
+
+ return (List<List<?>>)res;
+ }
+
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customTableFuncInner() {
+ Ignite ignite = Ignition.localIgnite();
+
+ return ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+ }
+
+ /** */
+ @QuerySqlFunction
+ public static List<List<?>> name(int id) {
+ Ignite ignite = Ignition.localIgnite();
+
+ return ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY WHERE
id = ?").setArgs(id))
+ .getAll();
+ }
+
+ /** */
+ @QuerySqlFunction
+ public static String nameAsStr(int id) {
+ Ignite ignite = Ignition.localIgnite();
+
+ List<List<?>> res = ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY WHERE
id = ?").setArgs(id))
+ .getAll();
+
+ return (String)res.get(0).get(0);
+ }
+ }
+
+ /** */
+ private static class City {
Review Comment:
Let's reuse `Employer` entity from superclass.
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ refResults.add(List.of(0, Integer.toString(0)));
+ // Insert outside tx.
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id, name)
VALUES (?, ?)").setArgs(0, 0)).getAll();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ for (int i = 1; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ // Simple select without udf.
+ List<List<?>> selectResult = cache
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+
+ assertThat(selectResult, equalTo(refResults));
+
+ // Select with udf.
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Select with nested udf.
+ res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Udf participate in dml.
+ res = cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (100, nameAsStr(1))")).getAll();
Review Comment:
`res = ` is redundant
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ refResults.add(List.of(0, Integer.toString(0)));
+ // Insert outside tx.
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id, name)
VALUES (?, ?)").setArgs(0, 0)).getAll();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ for (int i = 1; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ // Simple select without udf.
+ List<List<?>> selectResult = cache
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+
+ assertThat(selectResult, equalTo(refResults));
+
+ // Select with udf.
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Select with nested udf.
+ res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Udf participate in dml.
+ res = cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (100, nameAsStr(1))")).getAll();
+
+ res = cache.query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY
WHERE id = 100")).getAll();
+
+ assertEquals("1", res.get(0).get(0));
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ // A bit different case of udf.
+ List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT
name(?) AS result").setArgs(i)).getAll();
+
+ List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
+
+ assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
+ }
+
+ tx.commit();
+ }
+ }
+
+ /** */
+ @Test
+ public void testIsolationCorrectnessWithUdf() throws
IgniteCheckedException {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ /*The pool size should be greater than the maximum number of
concurrent queries initiated by UDFs*/
+ IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ for (int iter = 0; iter < 10; ++iter) {
+ try (Transaction tx =
client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ List<List<Object>> refResults = new ArrayList<>();
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO
PUBLIC.CITY(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ tx.rollback();
+ }
+ }
+ }, 10, "calcite-tx-with-udf");
+
+ fut.get(30_000);
+ }
+
+ /** */
+ private CacheConfiguration<Integer, Object> cacheConfig() {
+ var entity = new QueryEntity()
+ .setTableName("CITY")
+ .setKeyType(Integer.class.getName())
+ .setValueType(City.class.getName())
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("name", String.class.getName(), null)
+ .setKeyFieldName("id");
Review Comment:
Let's simplify to something like `new QueryEntity(Integer.class,
Employer.class)`
##########
modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java:
##########
@@ -174,7 +175,8 @@
QueryEntityValueColumnAliasTest.class,
CacheStoreTest.class,
MultiDcQueryMappingTest.class,
- TxWithExceptionalInterceptorTest.class
+ TxWithExceptionalInterceptorTest.class,
+ UserDefinedTxAwareFunctionsIntegrationTest.class
Review Comment:
Comma at the end of line please.
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
Review Comment:
Maybe it's better to test with both executors?
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
Review Comment:
Override nodeCount() instead?
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ refResults.add(List.of(0, Integer.toString(0)));
+ // Insert outside tx.
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id, name)
VALUES (?, ?)").setArgs(0, 0)).getAll();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ for (int i = 1; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ // Simple select without udf.
+ List<List<?>> selectResult = cache
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+
+ assertThat(selectResult, equalTo(refResults));
+
+ // Select with udf.
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Select with nested udf.
+ res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Udf participate in dml.
+ res = cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (100, nameAsStr(1))")).getAll();
+
+ res = cache.query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY
WHERE id = 100")).getAll();
+
+ assertEquals("1", res.get(0).get(0));
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ // A bit different case of udf.
+ List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT
name(?) AS result").setArgs(i)).getAll();
+
+ List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
+
+ assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
+ }
+
+ tx.commit();
+ }
+ }
+
+ /** */
+ @Test
+ public void testIsolationCorrectnessWithUdf() throws
IgniteCheckedException {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ /*The pool size should be greater than the maximum number of
concurrent queries initiated by UDFs*/
+ IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ for (int iter = 0; iter < 10; ++iter) {
+ try (Transaction tx =
client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ List<List<Object>> refResults = new ArrayList<>();
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO
PUBLIC.CITY(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ tx.rollback();
+ }
+ }
+ }, 10, "calcite-tx-with-udf");
+
+ fut.get(30_000);
+ }
+
+ /** */
+ private CacheConfiguration<Integer, Object> cacheConfig() {
+ var entity = new QueryEntity()
+ .setTableName("CITY")
+ .setKeyType(Integer.class.getName())
+ .setValueType(City.class.getName())
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("name", String.class.getName(), null)
+ .setKeyFieldName("id");
+
+ return new CacheConfiguration<Integer, Object>(DEFAULT_CACHE_NAME)
+ .setCacheMode(PARTITIONED)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setSqlSchema("PUBLIC")
+ .setSqlFunctionClasses(InnerSqlFunctionsLibrary.class)
+ .setQueryEntities(List.of(entity));
+ }
+
+ /** */
+ public static class InnerSqlFunctionsLibrary {
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customTableFunc() {
+ Ignite ignite = Ignition.localIgnite();
+
+ return ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+ }
+
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customNestedTableFunc() {
+ Ignite ignite = Ignition.localIgnite();
+
+ Object res = ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT customTableFuncInner() AS
result"))
+ .getAll().get(0).get(0);
+
+ return (List<List<?>>)res;
+ }
+
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customTableFuncInner() {
Review Comment:
Redundant, can be used customTableFunc
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
Review Comment:
Please use UDF and DML in comments with upper case letters
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ refResults.add(List.of(0, Integer.toString(0)));
+ // Insert outside tx.
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id, name)
VALUES (?, ?)").setArgs(0, 0)).getAll();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ for (int i = 1; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ // Simple select without udf.
+ List<List<?>> selectResult = cache
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+
+ assertThat(selectResult, equalTo(refResults));
+
+ // Select with udf.
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Select with nested udf.
+ res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Udf participate in dml.
+ res = cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (100, nameAsStr(1))")).getAll();
+
+ res = cache.query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY
WHERE id = 100")).getAll();
+
+ assertEquals("1", res.get(0).get(0));
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ // A bit different case of udf.
+ List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT
name(?) AS result").setArgs(i)).getAll();
+
+ List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
+
+ assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
+ }
+
+ tx.commit();
+ }
+ }
+
+ /** */
+ @Test
+ public void testIsolationCorrectnessWithUdf() throws
IgniteCheckedException {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ /*The pool size should be greater than the maximum number of
concurrent queries initiated by UDFs*/
Review Comment:
Spaces before/after comment and point at the end.
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ refResults.add(List.of(0, Integer.toString(0)));
+ // Insert outside tx.
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id, name)
VALUES (?, ?)").setArgs(0, 0)).getAll();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ for (int i = 1; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ // Simple select without udf.
+ List<List<?>> selectResult = cache
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+
+ assertThat(selectResult, equalTo(refResults));
+
+ // Select with udf.
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Select with nested udf.
+ res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Udf participate in dml.
+ res = cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (100, nameAsStr(1))")).getAll();
+
+ res = cache.query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY
WHERE id = 100")).getAll();
+
+ assertEquals("1", res.get(0).get(0));
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ // A bit different case of udf.
+ List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT
name(?) AS result").setArgs(i)).getAll();
+
+ List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
+
+ assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
+ }
+
+ tx.commit();
+ }
+ }
+
+ /** */
+ @Test
+ public void testIsolationCorrectnessWithUdf() throws
IgniteCheckedException {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ /*The pool size should be greater than the maximum number of
concurrent queries initiated by UDFs*/
+ IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ for (int iter = 0; iter < 10; ++iter) {
+ try (Transaction tx =
client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ List<List<Object>> refResults = new ArrayList<>();
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO
PUBLIC.CITY(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ tx.rollback();
+ }
+ }
+ }, 10, "calcite-tx-with-udf");
+
+ fut.get(30_000);
+ }
+
+ /** */
+ private CacheConfiguration<Integer, Object> cacheConfig() {
+ var entity = new QueryEntity()
+ .setTableName("CITY")
+ .setKeyType(Integer.class.getName())
+ .setValueType(City.class.getName())
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("name", String.class.getName(), null)
+ .setKeyFieldName("id");
+
+ return new CacheConfiguration<Integer, Object>(DEFAULT_CACHE_NAME)
+ .setCacheMode(PARTITIONED)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setSqlSchema("PUBLIC")
+ .setSqlFunctionClasses(InnerSqlFunctionsLibrary.class)
+ .setQueryEntities(List.of(entity));
+ }
+
+ /** */
+ public static class InnerSqlFunctionsLibrary {
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customTableFunc() {
+ Ignite ignite = Ignition.localIgnite();
+
+ return ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+ }
+
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customNestedTableFunc() {
+ Ignite ignite = Ignition.localIgnite();
+
+ Object res = ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT customTableFuncInner() AS
result"))
+ .getAll().get(0).get(0);
+
+ return (List<List<?>>)res;
+ }
+
+ /** */
+ @QuerySqlFunction
+ public List<List<?>> customTableFuncInner() {
+ Ignite ignite = Ignition.localIgnite();
+
+ return ignite.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+ }
+
+ /** */
+ @QuerySqlFunction
+ public static List<List<?>> name(int id) {
Review Comment:
nameTableFunc?
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ refResults.add(List.of(0, Integer.toString(0)));
+ // Insert outside tx.
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id, name)
VALUES (?, ?)").setArgs(0, 0)).getAll();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ for (int i = 1; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ // Simple select without udf.
+ List<List<?>> selectResult = cache
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+
+ assertThat(selectResult, equalTo(refResults));
+
+ // Select with udf.
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Select with nested udf.
+ res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Udf participate in dml.
+ res = cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (100, nameAsStr(1))")).getAll();
+
+ res = cache.query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY
WHERE id = 100")).getAll();
+
+ assertEquals("1", res.get(0).get(0));
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ // A bit different case of udf.
+ List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT
name(?) AS result").setArgs(i)).getAll();
+
+ List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
+
+ assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
+ }
+
+ tx.commit();
+ }
+ }
+
+ /** */
+ @Test
+ public void testIsolationCorrectnessWithUdf() throws
IgniteCheckedException {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ /*The pool size should be greater than the maximum number of
concurrent queries initiated by UDFs*/
+ IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ for (int iter = 0; iter < 10; ++iter) {
+ try (Transaction tx =
client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ List<List<Object>> refResults = new ArrayList<>();
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO
PUBLIC.CITY(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ tx.rollback();
+ }
+ }
+ }, 10, "calcite-tx-with-udf");
+
+ fut.get(30_000);
+ }
+
+ /** */
+ private CacheConfiguration<Integer, Object> cacheConfig() {
+ var entity = new QueryEntity()
+ .setTableName("CITY")
+ .setKeyType(Integer.class.getName())
+ .setValueType(City.class.getName())
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("name", String.class.getName(), null)
+ .setKeyFieldName("id");
+
+ return new CacheConfiguration<Integer, Object>(DEFAULT_CACHE_NAME)
+ .setCacheMode(PARTITIONED)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setSqlSchema("PUBLIC")
Review Comment:
No need to set schema (and use it in qyery) if you execute the query on the
same cache.
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
Review Comment:
IgniteCache<Integer, Object> cache = client.getOrCreateCache(cacheConfig());
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
Review Comment:
Why do we extend `AbstractBasicIntegrationTransactionalTest` if we don't use
test infrastructure and only use `SqlTransactionMode.ALL`?
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends
AbstractBasicIntegrationTransactionalTest {
+ /** */
+ @Parameterized.Parameter()
+ public SqlTransactionMode sqlTxMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sqlTxMode={0}")
+ public static Collection<?> parameters() {
+ return List.of(SqlTransactionMode.ALL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setQueryThreadPoolSize(16);
+ }
+
+ /** Check tx aware udf execution results. */
+ @Test
+ public void testTxAwareUserDefinedFunc() {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ List<List<Object>> refResults = new ArrayList<>();
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+ refResults.add(List.of(0, Integer.toString(0)));
+ // Insert outside tx.
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id, name)
VALUES (?, ?)").setArgs(0, 0)).getAll();
+
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ for (int i = 1; i < 2 * nodeCnt; ++i) {
+ refResults.add(List.of(i, Integer.toString(i)));
+ cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (?, ?)").setArgs(i, i)).getAll();
+ }
+
+ // Simple select without udf.
+ List<List<?>> selectResult = cache
+ .query(new SqlFieldsQuery("SELECT id, name FROM PUBLIC.CITY
ORDER BY id"))
+ .getAll();
+
+ assertThat(selectResult, equalTo(refResults));
+
+ // Select with udf.
+ List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT
customTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Select with nested udf.
+ res = cache.query(new SqlFieldsQuery("SELECT
customNestedTableFunc() AS result")).getAll();
+
+ assertThat(res.get(0).get(0), equalTo(refResults));
+
+ // Udf participate in dml.
+ res = cache.query(new SqlFieldsQuery("INSERT INTO PUBLIC.CITY(id,
name) VALUES (100, nameAsStr(1))")).getAll();
+
+ res = cache.query(new SqlFieldsQuery("SELECT name FROM PUBLIC.CITY
WHERE id = 100")).getAll();
+
+ assertEquals("1", res.get(0).get(0));
+
+ for (int i = 0; i < 2 * nodeCnt; ++i) {
+ // A bit different case of udf.
+ List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT
name(?) AS result").setArgs(i)).getAll();
+
+ List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
+
+ assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
+ }
+
+ tx.commit();
+ }
+ }
+
+ /** */
+ @Test
+ public void testIsolationCorrectnessWithUdf() throws
IgniteCheckedException {
+ assertTrue(nodeCount() > 1);
+ int nodeCnt = nodeCount();
+
+ client.getOrCreateCache(cacheConfig());
+
+ IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);
Review Comment:
IgniteCache<Integer, Object> cache = client.getOrCreateCache(cacheConfig());
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]