xiangfu0 commented on code in PR #18446:
URL: https://github.com/apache/pinot/pull/18446#discussion_r3211631941


##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIntNormalizer.java:
##########
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+
+
+/**
+ * Maps a raw signed integer hash output to a non-negative partition id in 
{@code [0, numPartitions)}.
+ *
+ * <p>{@link PartitionFunction} implementations report which normalizer 
matches their internal modulo
+ * semantics via {@link PartitionFunction#getPartitionIdNormalizer()}. The 
framework uses this label
+ * for identity / staleness matching between config-side and segment-side 
partition metadata; legacy
+ * functions perform their own modulo internally and the value is purely 
descriptive for them.
+ */
+public enum PartitionIntNormalizer {
+  /** Compute the remainder, then shift negative remainders into the valid 
range with {@code + numPartitions}. */
+  POSITIVE_MODULO {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      int partition = value % numPartitions;
+      return partition < 0 ? partition + numPartitions : partition;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      long partition = value % numPartitions;
+      return (int) (partition < 0 ? partition + numPartitions : partition);
+    }
+  },
+  /** Compute the remainder, then take its absolute value. */
+  ABS {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      int partition = value % numPartitions;
+      return partition < 0 ? -partition : partition;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      long partition = value % numPartitions;
+      return (int) (partition < 0 ? -partition : partition);
+    }
+  },
+  /** Mask the sign bit before applying modulo. */
+  MASK {
+    @Override
+    int toPartitionId(int value, int numPartitions) {
+      return (value & Integer.MAX_VALUE) % numPartitions;
+    }
+
+    @Override
+    int toPartitionId(long value, int numPartitions) {
+      return (int) ((value & Long.MAX_VALUE) % numPartitions);
+    }
+  },
+  /**
+   * Kafka-style {@code abs(value) % numPartitions} that handles {@code 
Integer.MIN_VALUE -> 0}
+   * (and {@code Long.MIN_VALUE -> 0}) to avoid the {@code Math.abs} overflow 
corner. Matches the
+   * legacy semantics of {@code HashCodePartitionFunction} and {@code 
ByteArrayPartitionFunction}.
+   */
+  KAFKA_ABS {

Review Comment:
   Done — renamed `KAFKA_ABS` to `PRE_MODULO_ABS` to make the 
pre-vs-post-modulo distinction with `ABS` explicit.



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionIntNormalizer.java:
##########
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+
+
+/**
+ * Maps a raw signed integer hash output to a non-negative partition id in 
{@code [0, numPartitions)}.
+ *
+ * <p>{@link PartitionFunction} implementations report which normalizer 
matches their internal modulo
+ * semantics via {@link PartitionFunction#getPartitionIdNormalizer()}. The 
framework uses this label
+ * for identity / staleness matching between config-side and segment-side 
partition metadata; legacy
+ * functions perform their own modulo internally and the value is purely 
descriptive for them.
+ */
+public enum PartitionIntNormalizer {

Review Comment:
   Done — renamed to `PartitionIdNormalizer`.



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java:
##########
@@ -18,86 +18,118 @@
  */
 package org.apache.pinot.segment.spi.partition;
 
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.annotations.PartitionFunctionType;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * Factory to build instances of {@link PartitionFunction}.
+ * Dynamic registry for {@link PartitionFunction} implementations.
+ *
+ * <p>Discovery is driven by classpath scanning for classes annotated with
+ * {@link PartitionFunctionType}. Annotated classes must:
+ * <ul>
+ *   <li>be public and implement {@link PartitionFunction}</li>
+ *   <li>live under a package matching the regex {@code 
.*\.partition\.function\..*}
+ *       (e.g. {@code org.apache.pinot.common.partition.function} or any 
plugin package
+ *       that follows the same convention)</li>
+ *   <li>expose a public constructor with signature
+ *       {@code (int numPartitions, Map<String, String> functionConfig)}</li>
+ * </ul>
+ *
+ * <p>The static block scans the classpath once and builds an immutable
+ * (canonicalized name → constructor) map. Instances are created on demand by
+ * {@link #getPartitionFunction(String, int, Map)}.
+ *
+ * <p>To force eager initialization (e.g. so the scan happens before the first 
segment
+ * is read), call {@link #init()} from broker/server/controller startup.
  */
 public class PartitionFunctionFactory {
-  // Enum for various partition functions to be added.
-  public enum PartitionFunctionType {
-    Modulo, Murmur, Murmur2, Murmur3, Fnv, ByteArray, HashCode, 
BoundedColumnValue;
-    // Add more functions here.
+  private PartitionFunctionFactory() {
+  }
 
-    private static final Map<String, PartitionFunctionType> VALUE_MAP = new 
HashMap<>();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionFunctionFactory.class);
+  private static final String SCAN_REGEX = ".*\\.partition\\.function\\..*";
 
-    static {
-      for (PartitionFunctionType functionType : 
PartitionFunctionType.values()) {
-        VALUE_MAP.put(functionType.name().toLowerCase(), functionType);
-      }
-    }
-
-    public static PartitionFunctionType fromString(String name) {
-      PartitionFunctionType functionType = VALUE_MAP.get(name.toLowerCase());
+  private static final Map<String, Constructor<? extends PartitionFunction>> 
REGISTRY;
 
-      if (functionType == null) {
-        throw new IllegalArgumentException("No enum constant for: " + name);
+  static {
+    long startTimeMs = System.currentTimeMillis();
+    Map<String, Constructor<? extends PartitionFunction>> registry = new 
HashMap<>();
+    for (Class<?> clazz : 
PinotReflectionUtils.getClassesThroughReflection(SCAN_REGEX, 
PartitionFunctionType.class)) {
+      if (!Modifier.isPublic(clazz.getModifiers()) || 
!PartitionFunction.class.isAssignableFrom(clazz)) {
+        continue;
+      }
+      PartitionFunctionType annotation = 
clazz.getAnnotation(PartitionFunctionType.class);
+      if (!annotation.enabled()) {
+        continue;
+      }
+      String[] names = annotation.names();
+      Preconditions.checkState(names.length > 0,

Review Comment:
   Done — annotation is now optional. Factory scans every public, concrete 
`PartitionFunction` subtype under `org.apache.pinot.*`. When the annotation is 
absent (or its `names()` is empty), the registry instantiates the class with 
`(1, null)` and registers under `getName()`. Added 
`UnannotatedTestPartitionFunction` fixture + test for the fallback path.



##########
pinot-common/src/main/java/org/apache/pinot/common/partition/function/BoundedColumnValuePartitionFunction.java:
##########
@@ -38,15 +41,23 @@
  * }
  * With this partition config on column "subject", partitionId would be 1 for 
Maths, 2 for English and 3 for Chemistry.
  * partitionId would be "0" for all other values which may occur, therefore 
'numPartitions' is set to 4.
+ *
+ * <p>The mapping output is already in {@code [0, numPartitions)}, so the 
configured
+ * {@link PartitionIntNormalizer} (default {@link 
PartitionIntNormalizer#POSITIVE_MODULO}) is a
+ * no-op for in-range values; it is still applied so the field is uniform 
across all partition
+ * functions.
  */
+@PartitionFunctionType(names = "BoundedColumnValue")

Review Comment:
   Done — dropped the `_normalizer` field from 
`BoundedColumnValuePartitionFunction`. The class returns 
`PartitionIdNormalizer.POSITIVE_MODULO` directly (no-op label since its output 
is already a fixed mapping in `[0, numPartitions)`).



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java:
##########
@@ -57,4 +59,19 @@ public interface PartitionFunction extends Serializable {
   default Map<String, String> getFunctionConfig() {
     return null;
   }
+
+  /**
+   * Reports the {@link PartitionIntNormalizer} that drives this partition 
function's int-to-id
+   * mapping. The built-in implementations now apply the named normalizer 
directly, so the value is
+   * authoritative — recomputing a partition via
+   * {@code 
PartitionIntNormalizer.valueOf(getPartitionIdNormalizer()).getPartitionId(rawHash,
 numPartitions)}
+   * yields the same result as {@link #getPartition(String)} for the same raw 
hash input.
+   *
+   * <p>Used by the framework for identity / staleness matching between 
config-side and segment-side
+   * partition metadata. Each implementation must declare its own value — 
there is intentionally no
+   * default. Plug-ins whose output is already in {@code [0, numPartitions)} 
should return
+   * {@link PartitionIntNormalizer#POSITIVE_MODULO} (a no-op label).
+   */
+  @JsonIgnore
+  String getPartitionIdNormalizer();

Review Comment:
   Per offline discussion: keeping the method abstract so every 
`PartitionFunction` implementation explicitly declares its int-to-id semantics. 
This makes config-side / segment-side staleness matching authoritative. Plug-in 
impls that don't map onto a standard normalizer can return `POSITIVE_MODULO` 
(no-op when output is already in `[0, N)`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to