xiangfu0 commented on code in PR #18446: URL: https://github.com/apache/pinot/pull/18446#discussion_r3211631941
########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIntNormalizer.java: ########## @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.partition; + +import com.google.common.base.Preconditions; +import java.util.Locale; + + +/** + * Maps a raw signed integer hash output to a non-negative partition id in {@code [0, numPartitions)}. + * + * <p>{@link PartitionFunction} implementations report which normalizer matches their internal modulo + * semantics via {@link PartitionFunction#getPartitionIdNormalizer()}. The framework uses this label + * for identity / staleness matching between config-side and segment-side partition metadata; legacy + * functions perform their own modulo internally and the value is purely descriptive for them. + */ +public enum PartitionIntNormalizer { + /** Compute the remainder, then shift negative remainders into the valid range with {@code + numPartitions}. */ + POSITIVE_MODULO { + @Override + int toPartitionId(int value, int numPartitions) { + int partition = value % numPartitions; + return partition < 0 ? partition + numPartitions : partition; + } + + @Override + int toPartitionId(long value, int numPartitions) { + long partition = value % numPartitions; + return (int) (partition < 0 ? partition + numPartitions : partition); + } + }, + /** Compute the remainder, then take its absolute value. */ + ABS { + @Override + int toPartitionId(int value, int numPartitions) { + int partition = value % numPartitions; + return partition < 0 ? -partition : partition; + } + + @Override + int toPartitionId(long value, int numPartitions) { + long partition = value % numPartitions; + return (int) (partition < 0 ? -partition : partition); + } + }, + /** Mask the sign bit before applying modulo. */ + MASK { + @Override + int toPartitionId(int value, int numPartitions) { + return (value & Integer.MAX_VALUE) % numPartitions; + } + + @Override + int toPartitionId(long value, int numPartitions) { + return (int) ((value & Long.MAX_VALUE) % numPartitions); + } + }, + /** + * Kafka-style {@code abs(value) % numPartitions} that handles {@code Integer.MIN_VALUE -> 0} + * (and {@code Long.MIN_VALUE -> 0}) to avoid the {@code Math.abs} overflow corner. Matches the + * legacy semantics of {@code HashCodePartitionFunction} and {@code ByteArrayPartitionFunction}. + */ + KAFKA_ABS { Review Comment: Done — renamed `KAFKA_ABS` to `PRE_MODULO_ABS` to make the pre-vs-post-modulo distinction with `ABS` explicit. ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIntNormalizer.java: ########## @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.partition; + +import com.google.common.base.Preconditions; +import java.util.Locale; + + +/** + * Maps a raw signed integer hash output to a non-negative partition id in {@code [0, numPartitions)}. + * + * <p>{@link PartitionFunction} implementations report which normalizer matches their internal modulo + * semantics via {@link PartitionFunction#getPartitionIdNormalizer()}. The framework uses this label + * for identity / staleness matching between config-side and segment-side partition metadata; legacy + * functions perform their own modulo internally and the value is purely descriptive for them. + */ +public enum PartitionIntNormalizer { Review Comment: Done — renamed to `PartitionIdNormalizer`. ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java: ########## @@ -18,86 +18,118 @@ */ package org.apache.pinot.segment.spi.partition; +import com.google.common.base.Preconditions; +import java.lang.reflect.Constructor; +import java.lang.reflect.Modifier; +import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata; +import org.apache.pinot.spi.annotations.PartitionFunctionType; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.utils.PinotReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Factory to build instances of {@link PartitionFunction}. + * Dynamic registry for {@link PartitionFunction} implementations. + * + * <p>Discovery is driven by classpath scanning for classes annotated with + * {@link PartitionFunctionType}. Annotated classes must: + * <ul> + * <li>be public and implement {@link PartitionFunction}</li> + * <li>live under a package matching the regex {@code .*\.partition\.function\..*} + * (e.g. {@code org.apache.pinot.common.partition.function} or any plugin package + * that follows the same convention)</li> + * <li>expose a public constructor with signature + * {@code (int numPartitions, Map<String, String> functionConfig)}</li> + * </ul> + * + * <p>The static block scans the classpath once and builds an immutable + * (canonicalized name → constructor) map. Instances are created on demand by + * {@link #getPartitionFunction(String, int, Map)}. + * + * <p>To force eager initialization (e.g. so the scan happens before the first segment + * is read), call {@link #init()} from broker/server/controller startup. */ public class PartitionFunctionFactory { - // Enum for various partition functions to be added. - public enum PartitionFunctionType { - Modulo, Murmur, Murmur2, Murmur3, Fnv, ByteArray, HashCode, BoundedColumnValue; - // Add more functions here. + private PartitionFunctionFactory() { + } - private static final Map<String, PartitionFunctionType> VALUE_MAP = new HashMap<>(); + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionFunctionFactory.class); + private static final String SCAN_REGEX = ".*\\.partition\\.function\\..*"; - static { - for (PartitionFunctionType functionType : PartitionFunctionType.values()) { - VALUE_MAP.put(functionType.name().toLowerCase(), functionType); - } - } - - public static PartitionFunctionType fromString(String name) { - PartitionFunctionType functionType = VALUE_MAP.get(name.toLowerCase()); + private static final Map<String, Constructor<? extends PartitionFunction>> REGISTRY; - if (functionType == null) { - throw new IllegalArgumentException("No enum constant for: " + name); + static { + long startTimeMs = System.currentTimeMillis(); + Map<String, Constructor<? extends PartitionFunction>> registry = new HashMap<>(); + for (Class<?> clazz : PinotReflectionUtils.getClassesThroughReflection(SCAN_REGEX, PartitionFunctionType.class)) { + if (!Modifier.isPublic(clazz.getModifiers()) || !PartitionFunction.class.isAssignableFrom(clazz)) { + continue; + } + PartitionFunctionType annotation = clazz.getAnnotation(PartitionFunctionType.class); + if (!annotation.enabled()) { + continue; + } + String[] names = annotation.names(); + Preconditions.checkState(names.length > 0, Review Comment: Done — annotation is now optional. Factory scans every public, concrete `PartitionFunction` subtype under `org.apache.pinot.*`. When the annotation is absent (or its `names()` is empty), the registry instantiates the class with `(1, null)` and registers under `getName()`. Added `UnannotatedTestPartitionFunction` fixture + test for the fallback path. ########## pinot-common/src/main/java/org/apache/pinot/common/partition/function/BoundedColumnValuePartitionFunction.java: ########## @@ -38,15 +41,23 @@ * } * With this partition config on column "subject", partitionId would be 1 for Maths, 2 for English and 3 for Chemistry. * partitionId would be "0" for all other values which may occur, therefore 'numPartitions' is set to 4. + * + * <p>The mapping output is already in {@code [0, numPartitions)}, so the configured + * {@link PartitionIntNormalizer} (default {@link PartitionIntNormalizer#POSITIVE_MODULO}) is a + * no-op for in-range values; it is still applied so the field is uniform across all partition + * functions. */ +@PartitionFunctionType(names = "BoundedColumnValue") Review Comment: Done — dropped the `_normalizer` field from `BoundedColumnValuePartitionFunction`. The class returns `PartitionIdNormalizer.POSITIVE_MODULO` directly (no-op label since its output is already a fixed mapping in `[0, numPartitions)`). ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java: ########## @@ -57,4 +59,19 @@ public interface PartitionFunction extends Serializable { default Map<String, String> getFunctionConfig() { return null; } + + /** + * Reports the {@link PartitionIntNormalizer} that drives this partition function's int-to-id + * mapping. The built-in implementations now apply the named normalizer directly, so the value is + * authoritative — recomputing a partition via + * {@code PartitionIntNormalizer.valueOf(getPartitionIdNormalizer()).getPartitionId(rawHash, numPartitions)} + * yields the same result as {@link #getPartition(String)} for the same raw hash input. + * + * <p>Used by the framework for identity / staleness matching between config-side and segment-side + * partition metadata. Each implementation must declare its own value — there is intentionally no + * default. Plug-ins whose output is already in {@code [0, numPartitions)} should return + * {@link PartitionIntNormalizer#POSITIVE_MODULO} (a no-op label). + */ + @JsonIgnore + String getPartitionIdNormalizer(); Review Comment: Per offline discussion: keeping the method abstract so every `PartitionFunction` implementation explicitly declares its int-to-id semantics. This makes config-side / segment-side staleness matching authoritative. Plug-in impls that don't map onto a standard normalizer can return `POSITIVE_MODULO` (no-op when output is already in `[0, N)`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
