LakshSingla commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1241591469


##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -613,96 +638,149 @@ private static <T, QueryType extends Query<T>> 
DataSource toInlineDataSource(
       final QueryToolChest<T, QueryType> toolChest,
       final AtomicInteger limitAccumulator,
       final AtomicLong memoryLimitAccumulator,
+      final AtomicBoolean cannotMaterializeToFrames,
       final int limit,
-      long memoryLimit
+      long memoryLimit,
+      boolean useNestedForUnknownTypeInSubquery
   )
   {
-    final int limitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
-    boolean memoryLimitSet = memoryLimit >= 0;
-
-    if (limitAccumulator.get() >= limitToUse) {
-      throw ResourceLimitExceededException.withMessage(
-          "Cannot issue subquery, maximum[%d] reached",
-          limitToUse
-      );
-    }
-
-    if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) {
-      throw ResourceLimitExceededException.withMessage(
-          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
-          memoryLimit
-      );
-    }
+    final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
 
     DataSource dataSource;
-    // Try to serialize the results into a frame only if the memory limit is 
set on the server or the query
-    if (memoryLimitSet) {
-      try {
-        Optional<Sequence<FrameSignaturePair>> framesOptional = 
toolChest.resultsAsFrames(
+
+    switch (ClientQuerySegmentWalkerUtils.getLimitType(memoryLimit, 
cannotMaterializeToFrames.get())) {
+      case ROW_LIMIT:
+        if (limitAccumulator.get() >= rowLimitToUse) {
+          throw ResourceLimitExceededException.withMessage(
+              "Cannot issue the query, subqueries generated results beyond 
maximum[%d] rows",
+              rowLimitToUse
+          );
+        }
+        dataSource = materializeResultsAsArray(
             query,
             results,
-            memoryLimit - memoryLimitAccumulator.get()
+            toolChest,
+            limitAccumulator,
+            limit
         );
-
-        if (!framesOptional.isPresent()) {
-          throw new ISE("The memory of the subqueries cannot be estimated 
correctly.");
+        break;
+      case MEMORY_LIMIT:
+        if (memoryLimitAccumulator.get() >= memoryLimit) {
+          throw ResourceLimitExceededException.withMessage(
+              "Cannot issue the query, subqueries generated results beyond 
maximum[%d] bytes",
+              memoryLimit
+          );
         }
-
-        Sequence<FrameSignaturePair> frames = framesOptional.get();
-        List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
-        frames.forEach(
-            frame -> {
-              if 
(memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
-                throw ResourceLimitExceededException.withMessage(
-                    "Subquery generated results beyond maximum[%d] bytes",
-                    memoryLimit
-                );
-
-              }
-              if (limitAccumulator.addAndGet(frame.getFrame().numRows()) >= 
limitToUse) {
-                throw ResourceLimitExceededException.withMessage(
-                    "Subquery generated results beyond maximum[%d] rows",
-                    limitToUse
-                );
-              }
-              frameSignaturePairs.add(frame);
-            }
-        );
-        dataSource = new FramesBackedInlineDataSource(frameSignaturePairs, 
toolChest.resultArraySignature(query));
-      }
-      catch (ResourceLimitExceededException rlee) {
-        throw rlee;
-      }
-      catch (Exception e) {
-        log.info(
-            "Unable to write the subquery results to a frame. Results won't be 
accounted for in the memory "
-            + "calculation"
+        Optional<DataSource> maybeDataSource = materializeResultsAsFrames(
+            query,
+            results,
+            toolChest,
+            limitAccumulator,
+            memoryLimitAccumulator,
+            memoryLimit,
+            useNestedForUnknownTypeInSubquery
         );
-        throw e;
-      }
-    } else {
-      final RowSignature signature = toolChest.resultArraySignature(query);
-
-      final ArrayList<Object[]> resultList = new ArrayList<>();
-
-      toolChest.resultsAsArrays(query, results).accumulate(
-          resultList,
-          (acc, in) -> {
-            if (limitAccumulator.getAndIncrement() >= limitToUse) {
-              throw ResourceLimitExceededException.withMessage(
-                  "Subquery generated results beyond maximum[%d] rows",
-                  limitToUse
-              );
-            }
-            acc.add(in);
-            return acc;
+        if (!maybeDataSource.isPresent()) {
+          cannotMaterializeToFrames.set(true);
+          // Check if the previous row limit accumulator has exceeded the 
memory results
+          if (memoryLimitAccumulator.get() >= memoryLimit) {
+            throw ResourceLimitExceededException.withMessage(
+                "Cannot issue the query, subqueries generated results beyond 
maximum[%d] bytes",
+                memoryLimit
+            );
           }
-      );
-      dataSource = InlineDataSource.fromIterable(resultList, signature);
+          dataSource = materializeResultsAsArray(
+              query,
+              results,
+              toolChest,
+              limitAccumulator,
+              limit
+          );
+        } else {
+          dataSource = maybeDataSource.get();
+        }
+        break;
+      default:
+        throw new IAE("Only row based and memory based limiting is supported");
     }
     return dataSource;
   }
 
+  private static <T, QueryType extends Query<T>> Optional<DataSource> 
materializeResultsAsFrames(
+      final QueryType query,
+      final Sequence<T> results,
+      final QueryToolChest<T, QueryType> toolChest,
+      final AtomicInteger limitAccumulator,
+      final AtomicLong memoryLimitAccumulator,
+      long memoryLimit,
+      boolean useNestedForUnknownTypeInSubquery
+  )
+  {
+    Optional<Sequence<FrameSignaturePair>> framesOptional;
+
+    try {
+      framesOptional = toolChest.resultsAsFrames(
+          query,
+          results,
+          new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+          useNestedForUnknownTypeInSubquery
+      );
+    }
+    catch (Exception e) {
+      return Optional.empty();

Review Comment:
   This will be executed per query so DEBUG is more appropriate according to 
me, otherwise, the logs will be cluttered with the exception message. Either we 
should:
   1. Keep it as DEBUG info so that we don't have to see cluttered logs. This 
has the disadvantage that we won't be able to readily observe if we fallback to 
the default method/code
   2. Don't catch the exception and let it propagate. The user will then report 
the issue and we can fix it.
   
   2nd option means that there won't be a fallback in case we aren't able to 
convert it to frames. Since this is a newer feature, I think we should still 
have a fallback till we are confident that we can convert each query, and once 
it is more mature and the frames can handle array types (currently it can 
handle string arrays only), we can remove this fallback altogether and let the 
exception pass through.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to