This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b98ca21927 Ensure response context is flushed at end of results 
accumulation to fix partial results behavior (#18025)
5b98ca21927 is described below

commit 5b98ca219273ccb2342b7bf37e86be411f06526c
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed Jun 4 10:40:21 2025 -0700

    Ensure response context is flushed at end of results accumulation to fix 
partial results behavior (#18025)
    
    When there's an intermittent segment load issue on a historical, queries 
that touch the segment return partial results (despite the default behavior of 
returnPartialResults: true). This issue specifically occurs when there are 
multiple query runners and the missing segment is at the end of the timeline; I 
tested a few scenarios where the missing segment is at the beginning or in the 
middle of the timeline, and in those cases, the query throws an missing segment 
exception as expected.
    
    The problem was that the response context header was written eagerly in 
initialize() and skipped in flush() once it's already initialized. However, 
query runners may still add entries to the response context after 
initialization. As a result, when ReportTimelineMissingSegmentQueryRunner sets 
the response context with the missing segments, it doesn't get propagated in 
the actual response sent back to the upstream server due to the short-circuit 
logic in flush() and other places.
    
    * Preserve the original comment re limiting the size
    
    * Comment clarity
    
    * Try adding an IT
    
    * Update javadocs for IT
    
    * Revert the fix temporarily to verify IT failure
    
    * Rename for clarity
    
    * Switch to twitterstream datasource for query retry test because it has 
multiple segments.
    
    - The twitterstream datasource has multiple segments (3 to be exact) as we 
can see in
      query-retry-sample-data.sql and the other similar sqls.
    - On the other hand wikipedia_editstream datasource only has 1 segment 
which is less
      useful esepcially to test scenarios where there are multiple segments 
missing.
    - Update the count for this in test expectation.
    
    * Revert "Revert the fix temporarily to verify IT failure"
    
    This reverts commit 36842bfc058fb19bae2672b8aff55ca5204bc95b.
    
    * Rename file to twitterstream
    
    * Revert "Revert "Revert the fix temporarily to verify IT failure""
    
    This reverts commit b873df084129ebb2b4bb951a6cda986cf35c7d89.
    
    * Revert "Revert "Revert "Revert the fix temporarily to verify IT failure"""
    
    This reverts commit 8cc2d3e73d2042b50273dc503a5b14fe15342220.
    
    * Address review comments.
    
    * Add only validateAndWriteHeader() & commentary for the test
---
 .../ServerManagerForQueryErrorTest.java            |  42 ++++++--
 .../query/ITQueryRetryTestOnMissingSegments.java   |  26 +++--
 ...=> twitterstream_queries_query_retry_test.json} |   4 +-
 .../org/apache/druid/server/QueryResultPusher.java |  83 +++++++++-------
 .../org/apache/druid/server/QueryResourceTest.java | 109 +++++++++++++++++++++
 .../org/apache/druid/sql/http/SqlResourceTest.java |  25 ++++-
 6 files changed, 233 insertions(+), 56 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java
 
b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java
index 4562e700c88..aa838f5a727 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java
@@ -60,14 +60,21 @@ import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This server manager is designed to test various query failures.
- *
- * - Missing segments. A segment can be missing during a query if a historical 
drops the segment
+ * <ul>
+ *  <li> Missing segments. A segment can be missing during a query if a 
historical drops the segment
  *   after the broker issues the query to the historical. To mimic this 
situation, the historical
- *   with this server manager announces all segments assigned, but reports 
missing segment for the
- *   first segment of the datasource specified in the query. The missing 
report is only generated once for the first
- *   segment. Post that report, all segments are served for the datasource. 
See ITQueryRetryTestOnMissingSegments.
- * - Other query errors. This server manager returns a sequence that always 
throws an exception
- *   based on a given query context value. See ITQueryErrorTest.
+ *   with this server manager announces all segments assigned, and reports 
missing segments based on the following:
+ *   <ul>
+ *     <li> If {@link #QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY} and {@link 
#QUERY_RETRY_TEST_CONTEXT_KEY} are set,
+ *           the segment at that index is reported as missing exactly 
once.</li>
+ *     <li> If {@link #QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY} is not set or 
is -1, it simulates missing segments
+ *     starting from the beginning, up to {@link 
#MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS}.</li>
+ *   </ul>
+ *   The missing report is only generated once for the first time. Post that 
report, upon retry, all segments are served
+ *   for the datasource. See ITQueryRetryTestOnMissingSegments. </li>
+ * <li> Other query errors. This server manager returns a sequence that always 
throws an exception
+ *   based on a given query context value. See ITQueryErrorTest. </li>
+ * </ul>
  *
  * @see org.apache.druid.query.RetryQueryRunner for query retrying.
  * @see org.apache.druid.client.JsonParserIterator for handling query errors 
from historicals.
@@ -81,9 +88,19 @@ public class ServerManagerForQueryErrorTest extends 
ServerManager
   public static final String QUERY_UNSUPPORTED_TEST_CONTEXT_KEY = 
"query-unsupported-test";
   public static final String RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY = 
"resource-limit-exceeded-test";
   public static final String QUERY_FAILURE_TEST_CONTEXT_KEY = 
"query-failure-test";
+  /**
+   * Query context that indicates which segment should be marked as 
unavilable/missing.
+   * This should be used in conjunction with {@link 
#QUERY_RETRY_TEST_CONTEXT_KEY}.
+   * <p>
+   * A value of {@code 0} means the first segment will be reported as missing, 
{@code 1} for the second, and so on.
+   * If this key is not set (default = -1), the test will instead simulate 
missing up to
+   * {@link #MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS} segments from the 
beginning.
+   * </p>
+   */
+  public static final String QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY = 
"unavailable-segment-idx";
+  private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 1;
 
   private static final Logger LOG = new 
Logger(ServerManagerForQueryErrorTest.class);
-  private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 1;
 
   private final ConcurrentHashMap<String, Integer> queryToIgnoredSegments = 
new ConcurrentHashMap<>();
 
@@ -135,6 +152,7 @@ public class ServerManagerForQueryErrorTest extends 
ServerManager
                    .map(segment -> {
                           final QueryContext queryContext = query.context();
                           if 
(queryContext.getBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
+                            final int unavailableSegmentIdx = 
queryContext.getInt(QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY, -1);
                             final MutableBoolean isIgnoreSegment = new 
MutableBoolean(false);
                             queryToIgnoredSegments.compute(
                                 query.getMostSpecificId(),
@@ -142,7 +160,13 @@ public class ServerManagerForQueryErrorTest extends 
ServerManager
                                   if (ignoreCounter == null) {
                                     ignoreCounter = 0;
                                   }
-                                  if (ignoreCounter < 
MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
+
+                                  if (unavailableSegmentIdx >= 0 && 
unavailableSegmentIdx == ignoreCounter) {
+                                    // Fail exactly once when counter matches 
the configured retry index
+                                    ignoreCounter++;
+                                    isIgnoreSegment.setTrue();
+                                  } else if (ignoreCounter < 
MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
+                                    // Fail up to N times for this query
                                     ignoreCounter++;
                                     isIgnoreSegment.setTrue();
                                   }
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
index f05e5c42e6e..1ba059a64a5 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
@@ -61,8 +61,8 @@ import java.util.Map;
 @Guice(moduleFactory = DruidTestModuleFactory.class)
 public class ITQueryRetryTestOnMissingSegments
 {
-  private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
-  private static final String QUERIES_RESOURCE = 
"/queries/wikipedia_editstream_queries_query_retry_test.json";
+  private static final String TWITTERSTREAM_DATA_SOURCE = "twitterstream";
+  private static final String QUERIES_RESOURCE = 
"/queries/twitterstream_queries_query_retry_test.json";
 
   /**
    * This enumeration represents an expectation after finishing running the 
test query.
@@ -97,9 +97,9 @@ public class ITQueryRetryTestOnMissingSegments
   @BeforeMethod
   public void before()
   {
-    // ensure that wikipedia segment is loaded completely
+    // ensure that twitterstream segments are loaded completely
     ITRetryUtil.retryUntilTrue(
-        () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), 
"wikipedia segment load"
+        () -> coordinatorClient.areSegmentsLoaded(TWITTERSTREAM_DATA_SOURCE), 
"twitterstream segments load"
     );
   }
 
@@ -128,6 +128,14 @@ public class ITQueryRetryTestOnMissingSegments
     testQueries(buildQuery(1, false), Expectation.ALL_SUCCESS);
   }
 
+  @Test
+  public void 
testFailureWhenLastSegmentIsMissingWithPartialResultsDisallowed() throws 
Exception
+  {
+    // Since retry is disabled and partial result is not allowed, the query 
must fail since the last segment
+    // is missing/unavailable.
+    testQueries(buildQuery(0, false, 2), Expectation.QUERY_FAILURE);
+  }
+
   private void testQueries(String queryWithResultsStr, Expectation 
expectation) throws Exception
   {
     final List<QueryWithResults> queries = jsonMapper.readValue(
@@ -215,15 +223,20 @@ public class ITQueryRetryTestOnMissingSegments
   }
 
   private String buildQuery(int numRetriesOnMissingSegments, boolean 
allowPartialResults) throws IOException
+  {
+    return buildQuery(numRetriesOnMissingSegments, allowPartialResults, -1);
+  }
+
+  private String buildQuery(int numRetriesOnMissingSegments, boolean 
allowPartialResults, int unavailableSegmentIdx) throws IOException
   {
     return StringUtils.replace(
         AbstractIndexerTest.getResourceAsString(QUERIES_RESOURCE),
         "%%CONTEXT%%",
-        
jsonMapper.writeValueAsString(buildContext(numRetriesOnMissingSegments, 
allowPartialResults))
+        
jsonMapper.writeValueAsString(buildContext(numRetriesOnMissingSegments, 
allowPartialResults, unavailableSegmentIdx))
     );
   }
 
-  private static Map<String, Object> buildContext(int 
numRetriesOnMissingSegments, boolean allowPartialResults)
+  private static Map<String, Object> buildContext(int 
numRetriesOnMissingSegments, boolean allowPartialResults, int 
unavailableSegmentIdx)
   {
     final Map<String, Object> context = new HashMap<>();
     // Disable cache so that each run hits historical.
@@ -232,6 +245,7 @@ public class ITQueryRetryTestOnMissingSegments
     context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 
numRetriesOnMissingSegments);
     context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
     context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY, 
true);
+    
context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY,
 unavailableSegmentIdx);
     return context;
   }
 }
diff --git 
a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
 
b/integration-tests/src/test/resources/queries/twitterstream_queries_query_retry_test.json
similarity index 87%
rename from 
integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
rename to 
integration-tests/src/test/resources/queries/twitterstream_queries_query_retry_test.json
index 0886cf45def..ffad3ca5817 100644
--- 
a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
+++ 
b/integration-tests/src/test/resources/queries/twitterstream_queries_query_retry_test.json
@@ -3,7 +3,7 @@
         "description": "timeseries, 1 agg, all",
         "query": {
             "queryType": "timeseries",
-            "dataSource": "wikipedia_editstream",
+            "dataSource": "twitterstream",
             "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
             "granularity": "all",
             "aggregations": [
@@ -18,7 +18,7 @@
             {
                 "timestamp": "2013-01-01T00:00:00.000Z",
                 "result": {
-                    "rows": 2390950
+                    "rows": 10948544
                 }
             }
         ]
diff --git 
a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java 
b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
index 1b6ed98122c..b4a57bea8e1 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
@@ -404,42 +404,8 @@ public abstract class QueryResultPusher
 
         DirectDruidClient.removeMagicResponseContextFields(responseContext);
 
-        // Limit the response-context header, see 
https://github.com/apache/druid/issues/2331
-        // Note that Response.ResponseBuilder.header(String key,Object 
value).build() calls value.toString()
-        // and encodes the string using ASCII, so 1 char is = 1 byte
-        ResponseContext.SerializationResult serializationResult;
-        try {
-          serializationResult = responseContext.serializeWith(
-              jsonMapper,
-              responseContextConfig.getMaxResponseContextHeaderSize()
-          );
-        }
-        catch (JsonProcessingException e) {
-          log.info(e, "Problem serializing to JSON!?");
-          serializationResult = new ResponseContext.SerializationResult("Could 
not serialize", "Could not serialize");
-        }
-
-        if (serializationResult.isTruncated()) {
-          final String logToPrint = StringUtils.format(
-              "Response Context truncated for id [%s]. Full context is [%s].",
-              queryId,
-              serializationResult.getFullResult()
-          );
-          if (responseContextConfig.shouldFailOnTruncatedResponseContext()) {
-            log.error(logToPrint);
-            throw new QueryInterruptedException(
-                new TruncatedResponseContextException(
-                    "Serialized response context exceeds the max size[%s]",
-                    responseContextConfig.getMaxResponseContextHeaderSize()
-                ),
-                selfNode.getHostAndPortToUse()
-            );
-          } else {
-            log.warn(logToPrint);
-          }
-        }
+        validateAndWriteResponseContextHeader();
 
-        response.setHeader(QueryResource.HEADER_RESPONSE_CONTEXT, 
serializationResult.getResult());
         response.setContentType(contentType.toString());
 
         if (response instanceof org.eclipse.jetty.server.Response) {
@@ -466,6 +432,50 @@ public abstract class QueryResultPusher
       }
     }
 
+    /**
+     * Serializes the response context header and sets it in the final {@link 
#response} header. It enforces the max
+     * header size limit and throws {@link QueryInterruptedException} if 
truncation is disallowed and the context is too large.
+     */
+    private void validateAndWriteResponseContextHeader()
+    {
+      // Limit the response-context header, see 
https://github.com/apache/druid/issues/2331
+      // Note that Response.ResponseBuilder.header(String key,Object 
value).build() calls value.toString()
+      // and encodes the string using ASCII, so 1 char is = 1 byte
+      ResponseContext.SerializationResult serializationResult;
+      try {
+        serializationResult = responseContext.serializeWith(
+            jsonMapper,
+            responseContextConfig.getMaxResponseContextHeaderSize()
+        );
+      }
+      catch (JsonProcessingException e) {
+        log.info(e, "Problem serializing to JSON!?");
+        serializationResult = new ResponseContext.SerializationResult("Could 
not serialize", "Could not serialize");
+      }
+
+      if (serializationResult.isTruncated()) {
+        final String logToPrint = StringUtils.format(
+            "Response Context truncated for id [%s]. Full context is [%s].",
+            queryId,
+            serializationResult.getFullResult()
+        );
+
+        if (responseContextConfig.shouldFailOnTruncatedResponseContext()) {
+          log.error(logToPrint);
+          throw new QueryInterruptedException(
+              new TruncatedResponseContextException(
+                  "Serialized response context exceeds the max size[%s]",
+                  responseContextConfig.getMaxResponseContextHeaderSize()
+              ),
+              selfNode.getHostAndPortToUse()
+          );
+        } else {
+          log.warn(logToPrint);
+        }
+      }
+      response.setHeader(QueryResource.HEADER_RESPONSE_CONTEXT, 
serializationResult.getResult());
+    }
+
     @Override
     @Nullable
     public Response accumulate(Response retVal, Object in)
@@ -489,6 +499,9 @@ public abstract class QueryResultPusher
       if (!initialized) {
         initialize();
       }
+
+      // call this again here since we're at the end and everything should 
have accumulated.
+      validateAndWriteResponseContextHeader();
       writer.writeResponseEnd();
     }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java 
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 68409290484..b6f4e6899b8 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -38,8 +38,10 @@ import org.apache.druid.error.ErrorResponse;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Accumulator;
+import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.guava.LazySequence;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -117,6 +119,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -528,6 +531,112 @@ public class QueryResourceTest
     
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), 
"true");
   }
 
+  @Test
+  public void 
testResponseContextContainsMissingSegments_whenLastSegmentIsMissing() throws 
IOException
+  {
+    final SegmentDescriptor missingSegDesc = new SegmentDescriptor(
+        Intervals.of("2025-01-01/P1D"), "0", 1
+    );
+
+    queryResource = new QueryResource(
+        new QueryLifecycleFactory(
+            CONGLOMERATE,
+            new QuerySegmentWalker()
+            {
+              @Override
+              public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> 
query, Iterable<Interval> intervals)
+              {
+                return (queryPlus, responseContext) -> new BaseSequence<>(
+                    new BaseSequence.IteratorMaker<T, Iterator<T>>() {
+                      @Override
+                      public Iterator<T> make()
+                      {
+                        List<T> data = Collections.singletonList((T) 
ImmutableMap.of("dummy", 1));
+                        Iterator<T> realIterator = data.iterator();
+
+                        return new Iterator<T>() {
+                          private boolean done = false;
+
+                          @Override
+                          public boolean hasNext()
+                          {
+                            if (realIterator.hasNext()) {
+                              return true;
+                            } else if (!done) {
+                              // Simulate a segment failure in the end after 
initialize() has run
+                              
responseContext.addMissingSegments(ImmutableList.of(missingSegDesc));
+                              done = true;
+                            }
+                            return false;
+                          }
+
+                          @Override
+                          public T next()
+                          {
+                            return realIterator.next();
+                          }
+                        };
+                      }
+
+                      @Override
+                      public void cleanup(Iterator<T> iterFromMake)
+                      {
+                      }
+                    }
+                );
+              }
+
+              @Override
+              public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> 
query, Iterable<SegmentDescriptor> specs)
+              {
+                throw new UnsupportedOperationException();
+              }
+            },
+            new DefaultGenericQueryMetricsFactory(),
+            new NoopServiceEmitter(),
+            testRequestLogger,
+            new AuthConfig(),
+            NoopPolicyEnforcer.instance(),
+            AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+            Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
+        ),
+        jsonMapper,
+        smileMapper,
+        queryScheduler,
+        new AuthConfig(),
+        null,
+        ResponseContextConfig.newConfig(true),
+        DRUID_NODE
+    );
+
+    expectPermissiveHappyPathAuth();
+
+    org.eclipse.jetty.server.Response response = 
this.jettyResponseforRequest(testServletRequest);
+
+    // Execute the query
+    Assert.assertNull(queryResource.doPost(
+        new 
ByteArrayInputStream(SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
+        null,
+        testServletRequest
+    ));
+
+
+    Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString()));
+    Assert.assertEquals(QueryResultPusher.RESULT_TRAILER_HEADERS, 
response.getHeader(HttpHeader.TRAILER.toString()));
+
+    final HttpFields observedFields = response.getTrailers().get();
+
+    
Assert.assertTrue(response.containsHeader(QueryResource.HEADER_RESPONSE_CONTEXT));
+    Assert.assertEquals(
+        jsonMapper.writeValueAsString(ImmutableMap.of("missingSegments", 
ImmutableList.of(missingSegDesc))),
+        response.getHeader(QueryResource.HEADER_RESPONSE_CONTEXT)
+    );
+
+    
Assert.assertTrue(observedFields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER));
+    Assert.assertEquals("true", 
observedFields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER));
+  }
+
+
   @Test
   public void testQueryThrowsRuntimeExceptionFromLifecycleExecute() throws 
IOException
   {
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java 
b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index dce1953a9d8..b1aa1975308 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.sql.http;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Splitter;
@@ -433,16 +434,32 @@ public class SqlResourceTest extends CalciteTestBase
 
     final MockHttpServletResponse response = postForAsyncResponse(sqlQuery, 
makeRegularUserReq());
 
-    Assert.assertEquals(
+    // In tests, MockHttpServletResponse stores headers as a MultiMap.
+    // This allows the same header key to be set multiple times (e.g., once at 
the start and once at the end of query processing).
+    // As a result, we observe duplicate context entries for this test in the 
expected set.
+    // This differs from typical behavior for other headers, where a new value 
would overwrite any previously set value.
+    final Object expectedMissingHeaders = ImmutableList.of(
         ImmutableMap.of(
             "uncoveredIntervals", "2030-01-01/78149827981274-01-01",
             "uncoveredIntervalsOverflowed", "true"
         ),
-        JSON_MAPPER.readValue(
-            
Iterables.getOnlyElement(response.headers.get("X-Druid-Response-Context")),
-            Map.class
+        ImmutableMap.of(
+            "uncoveredIntervals", "2030-01-01/78149827981274-01-01",
+            "uncoveredIntervalsOverflowed", "true"
         )
     );
+    final Object observedMissingHeaders = 
response.headers.get("X-Druid-Response-Context").stream()
+                                                           .map(s -> {
+                                                             try {
+                                                               return 
JSON_MAPPER.readValue(s, new TypeReference<Map<String, String>>() {});
+                                                             }
+                                                             catch 
(JsonProcessingException e) {
+                                                               throw new 
RuntimeException(e);
+                                                             }
+                                                           })
+                                                           
.collect(Collectors.toList());
+
+    Assert.assertEquals(expectedMissingHeaders, observedMissingHeaders);
 
     Object results = JSON_MAPPER.readValue(response.baos.toByteArray(), 
Object.class);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to