FrankChen021 commented on code in PR #19379: URL: https://github.com/apache/druid/pull/19379#discussion_r3161337178
########## processing/src/main/java/org/apache/druid/segment/transform/ScanTransformer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.CursorBuildSpec; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.CursorHolder; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentMapFunction; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * A {@link BaseTransformer} that processes input rows through a reusable scan query cursor pipeline. + * + * <p>The pipeline is built once at construction: a {@link SettableRowCursorFactory} is wrapped by the + * scan query's {@link SegmentMapFunction} (e.g., unnest, filter). For each input row, the row is set + * on the factory and the cursor is {@link Cursor#reset reset} — no per-row segment or cursor allocation. + * + * <p>When the scan query produces zero output rows (e.g., null/missing arrays, or filter rejection), + * the input row is dropped. This matches native Druid UNNEST / CROSS JOIN semantics where + * null or empty arrays produce zero rows. + * + * <p>This class is not thread-safe. Each reader thread should have its own instance. + */ +public class ScanTransformer implements BaseTransformer +{ + private final ScanQuery query; + private final SettableRowCursorFactory baseCursorFactory; + private final CursorHolder cursorHolder; + private Cursor cursor; + + ScanTransformer(final ScanQuery scanQuery) + { + this.query = scanQuery.withOverriddenContext( + Map.of(QueryContexts.TIMEOUT_KEY, 0) + ); + + final RowSignature broadSignature = RowSignature.builder() + .add(ColumnHolder.TIME_COLUMN_NAME, ColumnType.LONG) + .build(); + + final CursorBuildSpec cursorBuildSpec = CursorBuildSpec.builder() + .setInterval(query.getSingleInterval()) + .setFilter(Filters.toFilter(query.getFilter())) + .setVirtualColumns(query.getVirtualColumns()) + .build(); + + this.baseCursorFactory = new SettableRowCursorFactory(broadSignature); + final SegmentMapFunction segmentMapFunction = query.getDataSource().createSegmentMapFunction(query); + final Segment mappedSegment = segmentMapFunction.apply(Optional.of(new CursorFactorySegment(baseCursorFactory))) + .orElseThrow(() -> new ISE("SegmentMapFunction returned empty")); + final CursorFactory mappedCursorFactory = mappedSegment.as(CursorFactory.class); + this.cursorHolder = mappedCursorFactory.makeCursorHolder(cursorBuildSpec); + } + + @Override + public boolean hasMultiRowTransform() + { + return true; + } + + @Override + @Nullable + public InputRow transform(@Nullable final InputRow row) + { + throw new UnsupportedOperationException( + "ScanTransformer does not support single-row transform; use transformToList()" + ); + } + + @Override + public List<InputRow> transformToList(@Nullable final InputRow row) + { + if (row == null) { + return List.of(); + } + + return process(row); + } + + @Override + @Nullable + public InputRowListPlusRawValues transform(@Nullable final InputRowListPlusRawValues row) + { + if (row == null || row.getInputRows() == null) { + return row; + } + + final List<InputRow> inputRows = row.getInputRows(); + final List<Map<String, Object>> inputRawValues = row.getRawValuesList(); + final List<InputRow> outputRows = new ArrayList<>(); + final List<Map<String, Object>> outputRawValues = inputRawValues == null ? null : new ArrayList<>(); + + for (int i = 0; i < inputRows.size(); i++) { + final List<InputRow> expandedRows = transformToList(inputRows.get(i)); + outputRows.addAll(expandedRows); + if (outputRawValues != null) { + for (int j = 0; j < expandedRows.size(); j++) { + outputRawValues.add(inputRawValues.get(i)); + } + } + } + + return InputRowListPlusRawValues.ofList(outputRawValues, outputRows, row.getParseException()); + } + + @Override + public void close() throws IOException + { + cursorHolder.close(); + } + + private List<InputRow> process(final InputRow inputRow) + { + baseCursorFactory.set(inputRow); + + if (cursor == null) { + cursor = cursorHolder.asCursor(); + } else { + cursor.reset(); + } + + if (cursor == null || cursor.isDone()) { + return List.of(); + } + + final List<String> columns = resolveColumnsForRow(inputRow); + final List<String> dimensionColumns = resolveDimensionColumns(inputRow, columns); + final ColumnSelectorFactory selectorFactory = cursor.getColumnSelectorFactory(); + + final List<InputRow> result = new ArrayList<>(); + while (!cursor.isDone()) { + final Map<String, Object> event = new LinkedHashMap<>(); + for (final String col : columns) { + event.put(col, selectorFactory.makeColumnValueSelector(col).getObject()); + } + result.add(new MapBasedInputRow(inputRow.getTimestampFromEpoch(), dimensionColumns, event)); + cursor.advance(); + } + + return result; + } + + private List<String> resolveColumnsForRow(final InputRow inputRow) + { + final Set<String> columns = new LinkedHashSet<>(); + columns.add(ColumnHolder.TIME_COLUMN_NAME); + columns.addAll(inputRow.getDimensions()); + // Include raw event fields that aren't in getDimensions() — e.g. metric inputs that DataSchema added + // to dimensionExclusions. Without this, fixed-dimension ingestions with metrics would read null for + // their metric source fields in expanded rows. + if (inputRow instanceof MapBasedInputRow) { + columns.addAll(((MapBasedInputRow) inputRow).getEvent().keySet()); Review Comment: [P2] Keep metric source fields out of dimensions Adding every MapBasedInputRow event key to the resolved scan columns also feeds those keys into resolveDimensionColumns, which promotes metric input fields such as bytes_sent into row.getDimensions() after expansion. IncrementalIndex treats row.getDimensions() as authoritative and will discover/store those metric source fields as dimensions even though DataSchema excluded them for metricsSpec. Preserve the raw field in the event map for aggregators, but do not add non-dimension event fields to the output dimension list; add a test that verifies the metric source is not ingested as a dimension. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
