This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a895aaec506 DataSourceMetadataQuery: Use TimeBoundaryInspector as a
fallback. (#17686)
a895aaec506 is described below
commit a895aaec506dbf6f3cf9f8cadaae3d18c0d6e110
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jan 30 11:57:32 2025 -0800
DataSourceMetadataQuery: Use TimeBoundaryInspector as a fallback. (#17686)
PR #16849 changed the behavior such that maxIngestedEventTime is not
updated for non-real-time data. This patch restores the old behavior for
non-real-time data by using a TimeBoundaryInspector when
MaxIngestedEventTimeInspector is not present.
---
docs/querying/datasourcemetadataquery.md | 2 +-
.../DataSourceMetadataQueryRunnerFactory.java | 28 ++++++++-
.../DataSourceMetadataQueryTest.java | 73 +++++++++++++++++-----
3 files changed, 84 insertions(+), 19 deletions(-)
diff --git a/docs/querying/datasourcemetadataquery.md
b/docs/querying/datasourcemetadataquery.md
index 1f1cc0d49d5..0a77426e765 100644
--- a/docs/querying/datasourcemetadataquery.md
+++ b/docs/querying/datasourcemetadataquery.md
@@ -31,7 +31,7 @@ sidebar_label: "DatasourceMetadata"
Data Source Metadata queries return metadata information for a dataSource.
These queries return information about:
-* The timestamp of the latest ingested event for the dataSource. This is the
ingested event without any consideration of rollup.
+* `maxIngestedEventTime`: The timestamp of the latest ingested event for the
dataSource. For realtime datasources, this may be later than `MAX(__time)` if
`queryGranularity` is being used. For non-realtime datasources, this is
equivalent to `MAX(__time)`.
The grammar for these queries is:
diff --git
a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
index 0a5ef2a7680..645f65bd8ad 100644
---
a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java
@@ -35,11 +35,15 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.MaxIngestedEventTimeInspector;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TimeBoundaryInspector;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Iterator;
+import java.util.function.Supplier;
/**
+ *
*/
public class DataSourceMetadataQueryRunnerFactory
implements QueryRunnerFactory<Result<DataSourceMetadataResultValue>,
DataSourceMetadataQuery>
@@ -81,12 +85,12 @@ public class DataSourceMetadataQueryRunnerFactory
private static class DataSourceMetadataQueryRunner implements
QueryRunner<Result<DataSourceMetadataResultValue>>
{
private final Interval segmentInterval;
- private final MaxIngestedEventTimeInspector inspector;
+ private final Supplier<DateTime> inspector;
public DataSourceMetadataQueryRunner(Segment segment)
{
this.segmentInterval = segment.getDataInterval();
- this.inspector = segment.as(MaxIngestedEventTimeInspector.class);
+ this.inspector = createInspector(segment);
}
@Override
@@ -110,7 +114,7 @@ public class DataSourceMetadataQueryRunnerFactory
{
return legacyQuery.buildResult(
segmentInterval.getStart(),
- (inspector != null ? inspector.getMaxIngestedEventTime() :
null)
+ inspector.get()
).iterator();
}
@@ -122,5 +126,23 @@ public class DataSourceMetadataQueryRunnerFactory
}
);
}
+
+ /**
+ * Create a maxIngestedEventTime supplier for a given segment.
+ */
+ private static Supplier<DateTime> createInspector(final Segment segment)
+ {
+ final MaxIngestedEventTimeInspector ingestedEventTimeInspector =
segment.as(MaxIngestedEventTimeInspector.class);
+ if (ingestedEventTimeInspector != null) {
+ return ingestedEventTimeInspector::getMaxIngestedEventTime;
+ }
+
+ final TimeBoundaryInspector timeBoundaryInspector =
segment.as(TimeBoundaryInspector.class);
+ if (timeBoundaryInspector != null &&
timeBoundaryInspector.isMinMaxExact()) {
+ return timeBoundaryInspector::getMaxTime;
+ }
+
+ return () -> null;
+ }
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
index 3f0c0401e9a..e5b17e164a7 100644
---
a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
+++
b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
@@ -41,6 +41,9 @@ import
org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.LogicalSegment;
@@ -63,8 +66,8 @@ public class DataSourceMetadataQueryTest
public void testQuerySerialization() throws IOException
{
Query<?> query = Druids.newDataSourceMetadataQueryBuilder()
- .dataSource("testing")
- .build();
+ .dataSource("testing")
+ .build();
String json = JSON_MAPPER.writeValueAsString(query);
Query<?> serdeQuery = JSON_MAPPER.readValue(json, Query.class);
@@ -113,13 +116,36 @@ public class DataSourceMetadataQueryTest
Assert.assertEquals(true,
queryContext.getBoolean(QueryContexts.FINALIZE_KEY, false));
}
- @Test
- public void testMaxIngestedEventTime() throws Exception
+ /**
+ * Build an index using a row with the provided event timestamp.
+ */
+ private IncrementalIndex buildIndex(final DateTime eventTimestamp) throws
Exception
{
final IncrementalIndex rtIndex = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.build();
+ rtIndex.add(
+ new MapBasedInputRow(
+ eventTimestamp.getMillis(),
+ ImmutableList.of("dim1"),
+ ImmutableMap.of("dim1", "x")
+ )
+ );
+ return rtIndex;
+ }
+
+ @Test
+ public void testMaxIngestedEventTimeIncrementalIndex() throws Exception
+ {
+ final DateTime timestamp = DateTimes.of("2020-01-02T03:04:05.678Z");
+ final IncrementalIndex rtIndex = buildIndex(timestamp);
+ DataSourceMetadataQuery dataSourceMetadataQuery =
+ Druids.newDataSourceMetadataQueryBuilder()
+ .dataSource("testing")
+ .build();
+ ResponseContext context = ConcurrentResponseContext.createEmpty();
+ context.initializeMissingSegments();
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
new DataSourceMetadataQueryRunnerFactory(
@@ -129,19 +155,36 @@ public class DataSourceMetadataQueryTest
new IncrementalIndexSegment(rtIndex, SegmentId.dummy("test")),
null
);
- DateTime timestamp = DateTimes.nowUtc();
- rtIndex.add(
- new MapBasedInputRow(
- timestamp.getMillis(),
- ImmutableList.of("dim1"),
- ImmutableMap.of("dim1", "x")
- )
- );
- DataSourceMetadataQuery dataSourceMetadataQuery =
Druids.newDataSourceMetadataQueryBuilder()
-
.dataSource("testing")
- .build();
+
+ Iterable<Result<DataSourceMetadataResultValue>> results =
+ runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList();
+ DataSourceMetadataResultValue val = results.iterator().next().getValue();
+ DateTime maxIngestedEventTime = val.getMaxIngestedEventTime();
+
+ Assert.assertEquals(timestamp, maxIngestedEventTime);
+ }
+
+ @Test
+ public void testMaxIngestedEventTimeQueryableIndex() throws Exception
+ {
+ final DateTime timestamp = DateTimes.of("2020-01-02T03:04:05.678Z");
+ final QueryableIndex queryableIndex =
TestIndex.persistAndMemoryMap(buildIndex(timestamp));
+ DataSourceMetadataQuery dataSourceMetadataQuery =
+ Druids.newDataSourceMetadataQueryBuilder()
+ .dataSource("testing")
+ .build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
+
+ final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
+ new DataSourceMetadataQueryRunnerFactory(
+ new
DataSourceQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ ),
+ new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")),
+ null
+ );
+
Iterable<Result<DataSourceMetadataResultValue>> results =
runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList();
DataSourceMetadataResultValue val = results.iterator().next().getValue();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]