alex-plekhanov commented on code in PR #11405:
URL: https://github.com/apache/ignite/pull/11405#discussion_r1689952157


##########
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java:
##########
@@ -65,6 +74,9 @@ public class H2QueryInfo implements TrackableQuery {
     /** Query id. */
     private final long queryId;
 
+    /** Lock object. */
+    private final Object lock = new Object();

Review Comment:
   I think explicit lock is redundant, synchronized method can be used.



##########
modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java:
##########
@@ -215,28 +315,117 @@ private void checkBigResultSet() throws Exception {
      * @param args Query parameters.
      */
     private void sqlCheckLongRunning(String sql, Object... args) {
-        GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), 
QueryCancelledException.class, "");
+        GridTestUtils.assertThrowsAnyCause(log, () -> sql("test", sql, 
args).getAll(), QueryCancelledException.class, "");
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     */
+    private void sqlCheckLongRunningLazy(String sql, Object... args) {
+        pageSize = 1;
+
+        try {
+            assertFalse(sql("test", sql, args).iterator().next().isEmpty());
+        }
+        finally {
+            pageSize = DEFAULT_PAGE_SIZE;
+        }
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     */
+    private void sqlCheckLongRunningLazyWithMergeTable(String sql, Object... 
args) {
+        distributedJoins = true;
+
+        try {
+            CacheConfiguration ccfg1 = cacheConfig("pers", Integer.class, 
Person.class);
+            CacheConfiguration ccfg2 = cacheConfig("org", Integer.class, 
Organization.class);
+
+            IgniteCache<Integer, Person> cache1 = 
ignite.getOrCreateCache(ccfg1);
+            IgniteCache<Integer, Organization> cache2 = 
ignite.getOrCreateCache(ccfg2);
+
+            cache2.put(1, new Organization("o1"));
+            cache2.put(2, new Organization("o2"));
+            cache1.put(3, new Person(1, "p1"));
+            cache1.put(4, new Person(2, "p2"));
+            cache1.put(5, new Person(3, "p3"));
+
+            assertFalse(sql("pers", sql, args).getAll().isEmpty());
+        }
+        finally {
+            distributedJoins = false;
+        }
     }
 
     /**
      * Execute long-running sql with a check for errors.
      */
     private void sqlCheckLongRunning() {
-        sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS 
T2 where T0.id > ?", 0);
+        if (lazy && withMergeTable) {
+            String select = "select o.name n1, p.name n2 from Person p, 
\"org\".Organization o" +
+                " where p.orgId = o._key and o._key=1 and o._key < 
sleep_func(?)" +
+                " union select o.name n1, p.name n2 from Person p, 
\"org\".Organization o" +
+                " where p.orgId = o._key and o._key=2";
+
+            sqlCheckLongRunningLazyWithMergeTable(select, 2000);
+        }
+        else if (lazy && !withMergeTable)
+            sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < 
sleep_func(?)", 2000);
+        else
+            sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, 
test AS T2 where T0.id > ?", 0);
     }
 
     /**
+     * @param cacheName Cache name.
      * @param sql SQL query.
      * @param args Query parameters.
      * @return Results cursor.
      */
-    private FieldsQueryCursor<List<?>> sql(String sql, Object... args) {
-        return grid().context().query().querySqlFields(new SqlFieldsQuery(sql)
+    private FieldsQueryCursor<List<?>> sql(String cacheName, String sql, 
Object... args) {
+        return ignite.cache(cacheName).query(new SqlFieldsQuery(sql)
             .setTimeout(10, TimeUnit.SECONDS)
             .setLocal(local)
             .setLazy(lazy)
-            .setSchema("TEST")
-            .setArgs(args), false);
+            .setPageSize(pageSize)
+            .setDistributedJoins(distributedJoins)
+            .setArgs(args));
+    }
+
+    /** */
+    public void checkLazyWithExternalWait() throws InterruptedException {
+        pageSize = 1;
+
+        LogListener lsnr = LogListener
+            .matches(LONG_QUERY_EXEC_MSG)
+            .build();
+
+        testLog().registerListener(lsnr);
+
+        try {
+            Iterator<List<?>> it = sql("test", "select * from 
test").iterator();
+
+            it.next();
+
+            Thread.sleep(EXT_WAIT_TIME);
+
+            it.next();
+
+            ConcurrentHashMap qrys = 
GridTestUtils.getFieldValue(heavyQueriesTracker(), "qrys");
+
+            H2QueryInfo qry = (H2QueryInfo)qrys.keySet().iterator().next();
+
+            long extWait = GridTestUtils.getFieldValue(qry, "extWait");
+
+            assertTrue(extWait >= EXT_WAIT_TIME - EXT_WAIT_REL_ERR);

Review Comment:
   Test still can be flaky in some cases, even with EXT_WAIT_REL_ERR. Root 
cause to this error is async updating of 
   `U.currentTimeMillis()`. Instead I propose to modify `sleep` a little bit, 
we can replace `Thread.sleep(EXT_WAIT_TIME);` to something like:
   ```
               long sleepStartTs = U.currentTimeMillis();
               
               while (U.currentTimeMillis() - sleepStartTs <= EXT_WAIT_REL_ERR)
                   doSleep(100L);
   ```
   And in this case we can check extWait without  EXT_WAIT_REL_ERR



##########
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java:
##########
@@ -862,14 +885,18 @@ else if (qryResults.cancelled())
 
                         Boolean dataPageScanEnabled = 
isDataPageScanEnabled(req.getFlags());
 
-                        GridQueryNextPageResponse msg = prepareNextPage(
-                            nodeRess,
-                            node,
-                            qryResults,
-                            req.query(),
-                            req.segmentId(),
-                            req.pageSize(),
-                            dataPageScanEnabled);
+                        GridQueryNextPageResponse msg = 
h2.executeWithResumableTimeTracking(

Review Comment:
   Tracking is not stopped in case of exception here



##########
modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java:
##########
@@ -215,28 +315,117 @@ private void checkBigResultSet() throws Exception {
      * @param args Query parameters.
      */
     private void sqlCheckLongRunning(String sql, Object... args) {
-        GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), 
QueryCancelledException.class, "");
+        GridTestUtils.assertThrowsAnyCause(log, () -> sql("test", sql, 
args).getAll(), QueryCancelledException.class, "");
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     */
+    private void sqlCheckLongRunningLazy(String sql, Object... args) {
+        pageSize = 1;
+
+        try {
+            assertFalse(sql("test", sql, args).iterator().next().isEmpty());
+        }
+        finally {
+            pageSize = DEFAULT_PAGE_SIZE;
+        }
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     */
+    private void sqlCheckLongRunningLazyWithMergeTable(String sql, Object... 
args) {
+        distributedJoins = true;
+
+        try {
+            CacheConfiguration ccfg1 = cacheConfig("pers", Integer.class, 
Person.class);
+            CacheConfiguration ccfg2 = cacheConfig("org", Integer.class, 
Organization.class);
+
+            IgniteCache<Integer, Person> cache1 = 
ignite.getOrCreateCache(ccfg1);
+            IgniteCache<Integer, Organization> cache2 = 
ignite.getOrCreateCache(ccfg2);
+
+            cache2.put(1, new Organization("o1"));
+            cache2.put(2, new Organization("o2"));
+            cache1.put(3, new Person(1, "p1"));
+            cache1.put(4, new Person(2, "p2"));
+            cache1.put(5, new Person(3, "p3"));
+
+            assertFalse(sql("pers", sql, args).getAll().isEmpty());
+        }
+        finally {
+            distributedJoins = false;
+        }
     }
 
     /**
      * Execute long-running sql with a check for errors.
      */
     private void sqlCheckLongRunning() {
-        sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS 
T2 where T0.id > ?", 0);
+        if (lazy && withMergeTable) {
+            String select = "select o.name n1, p.name n2 from Person p, 
\"org\".Organization o" +
+                " where p.orgId = o._key and o._key=1 and o._key < 
sleep_func(?)" +
+                " union select o.name n1, p.name n2 from Person p, 
\"org\".Organization o" +
+                " where p.orgId = o._key and o._key=2";
+
+            sqlCheckLongRunningLazyWithMergeTable(select, 2000);
+        }
+        else if (lazy && !withMergeTable)
+            sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < 
sleep_func(?)", 2000);
+        else
+            sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, 
test AS T2 where T0.id > ?", 0);
     }
 
     /**
+     * @param cacheName Cache name.
      * @param sql SQL query.
      * @param args Query parameters.
      * @return Results cursor.
      */
-    private FieldsQueryCursor<List<?>> sql(String sql, Object... args) {
-        return grid().context().query().querySqlFields(new SqlFieldsQuery(sql)
+    private FieldsQueryCursor<List<?>> sql(String cacheName, String sql, 
Object... args) {
+        return ignite.cache(cacheName).query(new SqlFieldsQuery(sql)
             .setTimeout(10, TimeUnit.SECONDS)
             .setLocal(local)
             .setLazy(lazy)
-            .setSchema("TEST")
-            .setArgs(args), false);
+            .setPageSize(pageSize)
+            .setDistributedJoins(distributedJoins)
+            .setArgs(args));
+    }
+
+    /** */
+    public void checkLazyWithExternalWait() throws InterruptedException {
+        pageSize = 1;
+
+        LogListener lsnr = LogListener
+            .matches(LONG_QUERY_EXEC_MSG)
+            .build();
+
+        testLog().registerListener(lsnr);
+
+        try {
+            Iterator<List<?>> it = sql("test", "select * from 
test").iterator();
+
+            it.next();
+
+            Thread.sleep(EXT_WAIT_TIME);
+
+            it.next();
+
+            ConcurrentHashMap qrys = 
GridTestUtils.getFieldValue(heavyQueriesTracker(), "qrys");
+
+            H2QueryInfo qry = (H2QueryInfo)qrys.keySet().iterator().next();
+
+            long extWait = GridTestUtils.getFieldValue(qry, "extWait");

Review Comment:
   Lets introduce getter for `extWait` (perhaps for 
`HeavyQueriesTracker.qrys.keySet()` too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to