This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 01ba73c10b3 IGNITE-22925 Add long queries tracking for DML operations 
- Fixes #11535.
01ba73c10b3 is described below

commit 01ba73c10b303292d3f98e61545579e276389286
Author: 21518201 <[email protected]>
AuthorDate: Mon Oct 14 17:59:55 2024 +0300

    IGNITE-22925 Add long queries tracking for DML operations - Fixes #11535.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../internal/processors/query/h2/H2DmlInfo.java    |  83 ++++++++++
 .../processors/query/h2/IgniteH2Indexing.java      |  15 ++
 .../processors/query/LongRunningQueryTest.java     | 173 +++++++++++++++++++++
 3 files changed, 271 insertions(+)

diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
new file mode 100644
index 00000000000..8f95b57e7dd
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
+import org.apache.ignite.internal.processors.query.running.TrackableQuery;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class H2DmlInfo implements TrackableQuery {
+    /** Begin timestamp. */
+    private final long beginTs;
+
+    /** Query id. */
+    private final long qryId;
+
+    /** Initiator node id. */
+    private final UUID initNodeId;
+
+    /** Schema name. */
+    private final String schema;
+
+    /** Dml command. */
+    private final String sql;
+
+    /**
+     * @param beginTs Begin timestamp.
+     * @param qryId Query id.
+     * @param initNodeId Initiator node id.
+     * @param schema Schema name.
+     * @param sql Dml command.
+     */
+    public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema, 
String sql) {
+        this.beginTs = beginTs;
+        this.qryId = qryId;
+        this.initNodeId = initNodeId;
+        this.schema = schema;
+        this.sql = sql;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long time() {
+        return U.currentTimeMillis() - beginTs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String queryInfo(@Nullable String additionalInfo) {
+        StringBuilder msgSb = new StringBuilder();
+
+        if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID)
+            msgSb.append(" [globalQueryId=(undefined), 
node=").append(initNodeId);
+        else
+            msgSb.append(" 
[globalQueryId=").append(QueryUtils.globalQueryId(initNodeId, qryId));
+
+        if (additionalInfo != null)
+            msgSb.append(", ").append(additionalInfo);
+
+        msgSb.append(", duration=").append(time()).append("ms")
+            .append(", type=DML")
+            .append(", schema=").append(schema)
+            .append(", sql='").append(sql).append("']");
+
+        return msgSb.toString();
+    }
+}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 65600be66a7..a759fe3f09f 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1044,6 +1044,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         Exception failReason = null;
 
+        H2DmlInfo dmlInfo = null;
+
         try (TraceSurroundings ignored = 
MTC.support(ctx.tracing().create(SQL_DML_QRY_EXECUTE, MTC.span()))) {
             if (!updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
                 throw new IgniteSQLException("DML statements are not allowed 
inside a transaction over " +
@@ -1051,6 +1053,16 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
             }
 
+            dmlInfo = new H2DmlInfo(
+                U.currentTimeMillis(),
+                qryId,
+                ctx.localNodeId(),
+                qryDesc.schemaName(),
+                qryDesc.sql()
+            );
+
+            heavyQueriesTracker().startTracking(dmlInfo);
+
             if (!qryDesc.local()) {
                 return executeUpdateDistributed(
                     qryId,
@@ -1101,6 +1113,9 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     ", params=" + S.toString(QueryParameters.class, qryParams) 
+ "]", e);
         }
         finally {
+            if (dmlInfo != null)
+                heavyQueriesTracker().stopTracking(dmlInfo, failReason);
+
             runningQueryManager().unregister(qryId, failReason);
         }
     }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index a7faf240c2d..d9db970ebdd 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -32,6 +32,9 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
 import org.apache.ignite.internal.processors.query.h2.H2QueryInfo;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -54,9 +57,36 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
     /** Keys count. */
     private static final int KEY_CNT = 1000;
 
+    /** Long query warning timeout. */
+    private static final int LONG_QUERY_WARNING_TIMEOUT = 1000;
+
     /** External wait time. */
     private static final int EXT_WAIT_TIME = 2000;
 
+    /** Insert. */
+    private static final String INSERT_SQL = "insert into test (_key, _val) 
values (1001, wait_func())";
+
+    /** Insert with a subquery. */
+    private static final String INSERT_WITH_SUBQUERY_SQL = "insert into test 
(_key, _val) select p._key, p.orgId from " +
+        "\"pers\".Person p where p._key < wait_func()";
+
+    /** Update. */
+    private static final String UPDATE_SQL = "update test set _val = 
wait_func() where _key = 1";
+
+    /** Update with a subquery. */
+    private static final String UPDATE_WITH_SUBQUERY_SQL = "update test set 
_val = 111 where _key in " +
+        "(select p._key from \"pers\".Person p where p._key < wait_func())";
+
+    /** Delete. */
+    private static final String DELETE_SQL = "delete from test where _key = 
wait_func()";
+
+    /** Delete with a subquery. */
+    private static final String DELETE_WITH_SUBQUERY_SQL = "delete from test 
where _key in " +
+        "(select p._key from \"pers\".Person p where p._key < wait_func())";
+
+    /** Log listener for long DMLs. */
+    private static LogListener lsnrDml;
+
     /** Page size. */
     private int pageSize = DEFAULT_PAGE_SIZE;
 
@@ -75,6 +105,13 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
     /** Ignite instance. */
     private Ignite ignite;
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        return cfg.setSqlConfiguration(new 
SqlConfiguration().setLongQueryWarningTimeout(LONG_QUERY_WARNING_TIMEOUT));
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -95,6 +132,17 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
 
         for (long i = 0; i < KEY_CNT; ++i)
             c.put(i, i);
+
+        IgniteCache c2 = grid().createCache(cacheConfig("pers", Integer.class, 
Person.class));
+
+        c2.put(1001, new Person(1, "p1"));
+
+        lsnrDml = LogListener
+            .matches(LONG_QUERY_EXEC_MSG)
+            .andMatches(s -> s.contains("type=DML"))
+            .build();
+
+        testLog().registerListener(lsnrDml);
     }
 
     /** {@inheritDoc} */
@@ -182,6 +230,102 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         checkFastQueries();
     }
 
+    /** */
+    @Test
+    public void testLongRunningInsert() {
+        local = false;
+
+        runDml(INSERT_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningInsertWithSubquery() {
+        local = false;
+
+        runDml(INSERT_WITH_SUBQUERY_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningInsertLocal() {
+        local = true;
+
+        runDml(INSERT_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningInsertWithSubqueryLocal() {
+        local = true;
+
+        runDml(INSERT_WITH_SUBQUERY_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningUpdate() {
+        local = false;
+
+        runDml(UPDATE_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningUpdateWithSubquery() {
+        local = false;
+
+        runDml(UPDATE_WITH_SUBQUERY_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningUpdateLocal() {
+        local = true;
+
+        runDml(UPDATE_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningUpdateWithSubqueryLocal() {
+        local = true;
+
+        runDml(UPDATE_WITH_SUBQUERY_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningDelete() {
+        local = false;
+
+        runDml(DELETE_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningDeleteWithSubquery() {
+        local = false;
+
+        runDml(DELETE_WITH_SUBQUERY_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningDeleteLocal() {
+        local = true;
+
+        runDml(DELETE_SQL);
+    }
+
+    /** */
+    @Test
+    public void testLongRunningDeleteWithSubqueryLocal() {
+        local = true;
+
+        runDml(DELETE_WITH_SUBQUERY_SQL);
+    }
+
     /**
      * Test checks that no long-running queries warnings are printed in case 
of external waits during
      * the execution of distributed queries.
@@ -424,6 +568,21 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         }
     }
 
+    /**
+     * @param dml Dml command.
+     */
+    public void runDml(String dml) {
+        lazy = false;
+
+        long start = U.currentTimeMillis();
+
+        sql("test", dml);
+
+        assertTrue((U.currentTimeMillis() - start) > 
LONG_QUERY_WARNING_TIMEOUT);
+
+        assertTrue(lsnrDml.check());
+    }
+
     /**
      * Utility class with custom SQL functions.
      */
@@ -443,6 +602,20 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
             }
             return v;
         }
+
+        /** */
+        @SuppressWarnings("unused")
+        @QuerySqlFunction
+        public static int wait_func() {
+            try {
+                GridTestUtils.waitForCondition(() -> lsnrDml.check(), 10_000);
+            }
+            catch (IgniteInterruptedCheckedException ignored) {
+                // No-op
+            }
+
+            return 1;
+        }
     }
 
     /**

Reply via email to