This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a895aaec506 DataSourceMetadataQuery: Use TimeBoundaryInspector as a 
fallback. (#17686)
a895aaec506 is described below

commit a895aaec506dbf6f3cf9f8cadaae3d18c0d6e110
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jan 30 11:57:32 2025 -0800

    DataSourceMetadataQuery: Use TimeBoundaryInspector as a fallback. (#17686)
    
    PR #16849 changed the behavior such that maxIngestedEventTime is not
    updated for non-real-time data. This patch restores the old behavior for
    non-real-time data by using a TimeBoundaryInspector when
    MaxIngestedEventTimeInspector is not present.
---
 docs/querying/datasourcemetadataquery.md           |  2 +-
 .../DataSourceMetadataQueryRunnerFactory.java      | 28 ++++++++-
 .../DataSourceMetadataQueryTest.java               | 73 +++++++++++++++++-----
 3 files changed, 84 insertions(+), 19 deletions(-)

diff --git a/docs/querying/datasourcemetadataquery.md 
b/docs/querying/datasourcemetadataquery.md
index 1f1cc0d49d5..0a77426e765 100644
--- a/docs/querying/datasourcemetadataquery.md
+++ b/docs/querying/datasourcemetadataquery.md
@@ -31,7 +31,7 @@ sidebar_label: "DatasourceMetadata"
 
 Data Source Metadata queries return metadata information for a dataSource.  
These queries return information about:
 
-* The timestamp of the latest ingested event for the dataSource. This is the 
ingested event without any consideration of rollup.
+* `maxIngestedEventTime`: The timestamp of the latest ingested event for the 
dataSource. For realtime datasources, this may be later than `MAX(__time)` if 
`queryGranularity` is being used. For non-realtime datasources, this is 
equivalent to `MAX(__time)`.
 
 The grammar for these queries is:
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
 
b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
index 0a5ef2a7680..645f65bd8ad 100644
--- 
a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
@@ -35,11 +35,15 @@ import org.apache.druid.query.Result;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.segment.MaxIngestedEventTimeInspector;
 import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TimeBoundaryInspector;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.util.Iterator;
+import java.util.function.Supplier;
 
 /**
+ *
  */
 public class DataSourceMetadataQueryRunnerFactory
     implements QueryRunnerFactory<Result<DataSourceMetadataResultValue>, 
DataSourceMetadataQuery>
@@ -81,12 +85,12 @@ public class DataSourceMetadataQueryRunnerFactory
   private static class DataSourceMetadataQueryRunner implements 
QueryRunner<Result<DataSourceMetadataResultValue>>
   {
     private final Interval segmentInterval;
-    private final MaxIngestedEventTimeInspector inspector;
+    private final Supplier<DateTime> inspector;
 
     public DataSourceMetadataQueryRunner(Segment segment)
     {
       this.segmentInterval = segment.getDataInterval();
-      this.inspector = segment.as(MaxIngestedEventTimeInspector.class);
+      this.inspector = createInspector(segment);
     }
 
     @Override
@@ -110,7 +114,7 @@ public class DataSourceMetadataQueryRunnerFactory
             {
               return legacyQuery.buildResult(
                   segmentInterval.getStart(),
-                  (inspector != null ? inspector.getMaxIngestedEventTime() : 
null)
+                  inspector.get()
               ).iterator();
             }
 
@@ -122,5 +126,23 @@ public class DataSourceMetadataQueryRunnerFactory
           }
       );
     }
+
+    /**
+     * Create a maxIngestedEventTime supplier for a given segment.
+     */
+    private static Supplier<DateTime> createInspector(final Segment segment)
+    {
+      final MaxIngestedEventTimeInspector ingestedEventTimeInspector = 
segment.as(MaxIngestedEventTimeInspector.class);
+      if (ingestedEventTimeInspector != null) {
+        return ingestedEventTimeInspector::getMaxIngestedEventTime;
+      }
+
+      final TimeBoundaryInspector timeBoundaryInspector = 
segment.as(TimeBoundaryInspector.class);
+      if (timeBoundaryInspector != null && 
timeBoundaryInspector.isMinMaxExact()) {
+        return timeBoundaryInspector::getMaxTime;
+      }
+
+      return () -> null;
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
 
b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
index 3f0c0401e9a..e5b17e164a7 100644
--- 
a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
@@ -41,6 +41,9 @@ import 
org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.context.ConcurrentResponseContext;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.timeline.LogicalSegment;
@@ -63,8 +66,8 @@ public class DataSourceMetadataQueryTest
   public void testQuerySerialization() throws IOException
   {
     Query<?> query = Druids.newDataSourceMetadataQueryBuilder()
-                        .dataSource("testing")
-                        .build();
+                           .dataSource("testing")
+                           .build();
 
     String json = JSON_MAPPER.writeValueAsString(query);
     Query<?> serdeQuery = JSON_MAPPER.readValue(json, Query.class);
@@ -113,13 +116,36 @@ public class DataSourceMetadataQueryTest
     Assert.assertEquals(true, 
queryContext.getBoolean(QueryContexts.FINALIZE_KEY, false));
   }
 
-  @Test
-  public void testMaxIngestedEventTime() throws Exception
+  /**
+   * Build an index using a row with the provided event timestamp.
+   */
+  private IncrementalIndex buildIndex(final DateTime eventTimestamp) throws 
Exception
   {
     final IncrementalIndex rtIndex = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setMaxRowCount(1000)
         .build();
+    rtIndex.add(
+        new MapBasedInputRow(
+            eventTimestamp.getMillis(),
+            ImmutableList.of("dim1"),
+            ImmutableMap.of("dim1", "x")
+        )
+    );
+    return rtIndex;
+  }
+
+  @Test
+  public void testMaxIngestedEventTimeIncrementalIndex() throws Exception
+  {
+    final DateTime timestamp = DateTimes.of("2020-01-02T03:04:05.678Z");
+    final IncrementalIndex rtIndex = buildIndex(timestamp);
+    DataSourceMetadataQuery dataSourceMetadataQuery =
+        Druids.newDataSourceMetadataQueryBuilder()
+              .dataSource("testing")
+              .build();
+    ResponseContext context = ConcurrentResponseContext.createEmpty();
+    context.initializeMissingSegments();
 
     final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
         new DataSourceMetadataQueryRunnerFactory(
@@ -129,19 +155,36 @@ public class DataSourceMetadataQueryTest
         new IncrementalIndexSegment(rtIndex, SegmentId.dummy("test")),
         null
     );
-    DateTime timestamp = DateTimes.nowUtc();
-    rtIndex.add(
-        new MapBasedInputRow(
-            timestamp.getMillis(),
-            ImmutableList.of("dim1"),
-            ImmutableMap.of("dim1", "x")
-        )
-    );
-    DataSourceMetadataQuery dataSourceMetadataQuery = 
Druids.newDataSourceMetadataQueryBuilder()
-                                                            
.dataSource("testing")
-                                                            .build();
+
+    Iterable<Result<DataSourceMetadataResultValue>> results =
+        runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList();
+    DataSourceMetadataResultValue val = results.iterator().next().getValue();
+    DateTime maxIngestedEventTime = val.getMaxIngestedEventTime();
+
+    Assert.assertEquals(timestamp, maxIngestedEventTime);
+  }
+
+  @Test
+  public void testMaxIngestedEventTimeQueryableIndex() throws Exception
+  {
+    final DateTime timestamp = DateTimes.of("2020-01-02T03:04:05.678Z");
+    final QueryableIndex queryableIndex = 
TestIndex.persistAndMemoryMap(buildIndex(timestamp));
+    DataSourceMetadataQuery dataSourceMetadataQuery =
+        Druids.newDataSourceMetadataQueryBuilder()
+              .dataSource("testing")
+              .build();
     ResponseContext context = ConcurrentResponseContext.createEmpty();
     context.initializeMissingSegments();
+
+    final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
+        new DataSourceMetadataQueryRunnerFactory(
+            new 
DataSourceQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
+            QueryRunnerTestHelper.NOOP_QUERYWATCHER
+        ),
+        new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")),
+        null
+    );
+
     Iterable<Result<DataSourceMetadataResultValue>> results =
         runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList();
     DataSourceMetadataResultValue val = results.iterator().next().getValue();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to