clintropolis commented on code in PR #19460: URL: https://github.com/apache/druid/pull/19460#discussion_r3352555804
########## processing/src/main/java/org/apache/druid/segment/projections/ClusteredValueGroupsBaseTableSchema.java: ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.projections; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.AggregateProjectionMetadata; +import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Top-level summary for a clustered base table whose groups are identified by discrete clustering-value tuples. Each + * tuple group is internally stored as a separate table without storing the cluster columns, which are pulled into this + * metadata. This is optimizing for use cases which typically only need to read from a single group via filters present + * on a query. Cluster groups nest inside as {@link #getClusterGroups()}; their column data live in the V10 segment + * file under dictionary-id-tuple prefixes ({@code __base$<id0>_<id1>...<idK>/<col>}), where the ids index into + * {@link #getClusteringDictionaries()}. + */ +public class ClusteredValueGroupsBaseTableSchema implements BaseTableProjectionSchema +{ + public static final String TYPE_NAME = "clustered-value-groups-base-table"; + + private final VirtualColumns virtualColumns; + private final List<String> columnNames; + private final AggregatorFactory[] aggregators; + private final List<OrderBy> ordering; + private final RowSignature clusteringColumns; + private final List<String> sharedColumns; + private final ClusteringDictionaries clusteringDictionaries; + private final List<TableClusterGroupSpec> clusterGroups; + + // computed + private final int timeColumnPosition; + private final Granularity effectiveGranularity; + + @JsonCreator + public ClusteredValueGroupsBaseTableSchema( + @JsonProperty("virtualColumns") VirtualColumns virtualColumns, + @JsonProperty("columns") List<String> columns, + @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators, + @JsonProperty("ordering") List<OrderBy> ordering, + @JsonProperty("clusteringColumns") RowSignature clusteringColumns, + @JsonProperty("sharedColumns") @Nullable List<String> sharedColumns, + @JsonProperty("clusteringDictionaries") @Nullable ClusteringDictionaries clusteringDictionaries, + @JsonProperty("clusterGroups") @Nullable List<TableClusterGroupSpec> clusterGroups + ) + { + if (CollectionUtils.isNullOrEmpty(columns)) { + throw DruidException.defensive("clustered base table schema columns must not be null or empty"); + } + if (ordering == null) { + throw DruidException.defensive("clustered base table schema ordering must not be null"); + } + if (clusteringColumns == null || clusteringColumns.size() == 0) { + throw DruidException.defensive( + "clustered base table schema clusteringColumns must not be null or empty" + ); + } + if (ordering.size() < clusteringColumns.size()) { + throw DruidException.defensive( + "ordering size [%s] must be at least clusteringColumns size [%s] (clustering columns must form a prefix" + + " of the segment ordering)", + ordering.size(), + clusteringColumns.size() + ); + } + for (int i = 0; i < clusteringColumns.size(); i++) { + final String clusteringColumn = clusteringColumns.getColumnName(i); + if (!columns.contains(clusteringColumn)) { + throw DruidException.defensive( + "clusteringColumn [%s] must appear in columns of the clustered base table summary", + clusteringColumn + ); + } + final ColumnType type = clusteringColumns.getColumnType(i).orElse(null); + if (!Projections.isAllowedClusteringType(type)) { + throw DruidException.defensive( + "clustering column [%s] has unsupported type [%s]; allowed types are STRING, LONG, DOUBLE, FLOAT", + clusteringColumn, + type + ); + } + // Per-group ordering is derived by dropping this prefix; pruning + cursor concatenation rely on it. + final String orderingColumn = ordering.get(i).getColumnName(); + if (!clusteringColumn.equals(orderingColumn)) { + throw DruidException.defensive( + "clustering column at position [%s] is [%s] but the segment ordering at the same position is [%s];" + + " clustering columns must form a prefix of the segment ordering", + i, + clusteringColumn, + orderingColumn + ); + } + } + final List<String> resolvedSharedColumns = sharedColumns == null ? List.of() : sharedColumns; + for (String shared : resolvedSharedColumns) { + if (!columns.contains(shared)) { + throw DruidException.defensive( + "sharedColumn [%s] must appear in columns of the clustered base table summary", + shared + ); + } + } + this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns; + this.columnNames = columns; + this.aggregators = aggregators == null ? new AggregatorFactory[0] : aggregators; + this.ordering = ordering; + this.clusteringColumns = clusteringColumns; + this.sharedColumns = resolvedSharedColumns; + this.clusterGroups = clusterGroups == null ? List.of() : List.copyOf(clusterGroups); + this.clusteringDictionaries = clusteringDictionaries == null + ? ClusteringDictionaries.EMPTY + : clusteringDictionaries; + + int foundTimePosition = -1; + Granularity granularity = null; + for (int i = 0; i < ordering.size(); i++) { + OrderBy orderBy = ordering.get(i); + if (orderBy.getColumnName().equals(ColumnHolder.TIME_COLUMN_NAME)) { + foundTimePosition = i; + final VirtualColumn vc = this.virtualColumns.getVirtualColumn(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME); + if (vc != null) { + granularity = Granularities.fromVirtualColumn(vc); + } else { + granularity = Granularities.NONE; + } + } + } + if (granularity == null) { + throw DruidException.defensive( + "clustered base table doesn't have a [%s] column?", + ColumnHolder.TIME_COLUMN_NAME + ); + } + this.timeColumnPosition = foundTimePosition; + this.effectiveGranularity = granularity; + + // Specs always start unwired: there's a chicken-and-egg between the summary and its specs, resolved by + // deferring all summary-dependent state on the spec to setSummary, which we invoke here once the summary's + // own state is populated. + for (TableClusterGroupSpec spec : this.clusterGroups) { + spec.setSummary(this); + } + } + + @JsonIgnore + @Override + public List<String> getColumnNames() Review Comment: its part of the `ProjectionSchema` interface, so it needs implemented to be a base table projection. `getColumns` is the json property for the serde that writes this base table spec into the segment metadata (similar to equivalents on other base table schemas). So, they are serving different purposes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
